From 59c4c7575d3ff7d5de5a250b3d90827613e84e9d Mon Sep 17 00:00:00 2001 From: OpenIM-Robot Date: Fri, 25 Oct 2024 18:19:01 +0800 Subject: [PATCH] deps: Merge #2716 #2718 #2719 #2724 #2730 #2770 #2772 #2773 #2775 #2777 #2779 #2782 #2785 #2786 #2789 #2790 #2792 PRs into pre-release-v3.8.2 (#2797) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Line webhook (#2716) * feat: online and offline webhook * feat: online and offline webhook * feat: remove zk * fix: the message I sent is not set to read seq in mongodb (#2718) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb --------- Co-authored-by: withchao * fix: cannot modify group member avatars (#2719) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars --------- Co-authored-by: withchao * fix: auth package import twice (#2724) * fix: group level change logic (#2730) * feat: Add More Multi Login Policy (#2770) * feat: multiLogin * feat: change config * fix: join the group chat directly, notification type error (#2772) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification --------- Co-authored-by: withchao * fix: joinSource check args error. (#2773) Co-authored-by: Monet Lee * feat: change push config (#2775) * fix: change group member info send notification (#2777) * fix: client sends message status error to server (#2779) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification * fix: MsgData status --------- Co-authored-by: withchao * fix: improve setConversationAtInfo logic. (#2782) * fix: improve ConversationATInfo logic. * fix logic err. * fix: del UserB's conversation version cache when userA set conversation's isPrivateChat to true. (#2785) * chore: remove unused .chglog and unnecessary content in goreleaser (#2786) * Revert: Change group member roleLevel can`t send notification (#2789) * fix: change group member info send notification * fix: change group member info send notification * fix: group * fix: group * fix: group * fix: improve transfer Owner logic when newOwner is mute. (#2790) * fix: improve ConversationATInfo logic. * fix logic err. * fix: improve transfer Owner logic when newOwner is mute. * fix: improve getUserInfo logic. (#2792) * fix: improve ConversationATInfo logic. * fix logic err. * fix: improve transfer Owner logic when newOwner is mute. * fix: improve getUserInfo logic. --------- Co-authored-by: icey-yu <119291641+icey-yu@users.noreply.github.com> Co-authored-by: chao <48119764+withchao@users.noreply.github.com> Co-authored-by: withchao Co-authored-by: liangkai Co-authored-by: Alilestera <75608652+alilestera@users.noreply.github.com> Co-authored-by: Monet Lee Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> --- CHANGELOG/.chglog/CHANGELOG.tpl.md | 62 ----- CHANGELOG/.chglog/config.yml | 67 ----- build/goreleaser.yaml | 91 ------- config/discovery.yml | 5 - config/share.yml | 15 +- docker-compose.yml | 14 +- go.mod | 2 +- go.sum | 4 +- internal/msggateway/online.go | 13 + internal/msggateway/ws_server.go | 70 +++-- internal/push/a_test.go | 29 -- internal/push/offlinepush/offlinepusher.go | 7 +- internal/push/offlinepush_handler.go | 3 + internal/push/push_handler.go | 3 + internal/rpc/auth/auth.go | 9 +- internal/rpc/group/group.go | 107 ++++++-- internal/rpc/group/notification.go | 48 +++- internal/rpc/msg/send.go | 19 +- internal/rpc/user/user.go | 7 +- pkg/common/config/config.go | 26 +- pkg/common/storage/cache/cachekey/token.go | 19 +- pkg/common/storage/cache/redis/token.go | 64 ++++- pkg/common/storage/cache/token.go | 2 + pkg/common/storage/controller/auth.go | 251 ++++++++++++++---- pkg/common/storage/controller/conversation.go | 5 +- pkg/common/storage/controller/msg_transfer.go | 6 +- pkg/rpccache/user.go | 2 + pkg/rpcclient/auth.go | 21 +- 28 files changed, 561 insertions(+), 410 deletions(-) delete mode 100644 CHANGELOG/.chglog/CHANGELOG.tpl.md delete mode 100644 CHANGELOG/.chglog/config.yml delete mode 100644 internal/push/a_test.go diff --git a/CHANGELOG/.chglog/CHANGELOG.tpl.md b/CHANGELOG/.chglog/CHANGELOG.tpl.md deleted file mode 100644 index 100a29ed8..000000000 --- a/CHANGELOG/.chglog/CHANGELOG.tpl.md +++ /dev/null @@ -1,62 +0,0 @@ -# Version logging for OpenIM - - - - - -{{ if .Versions -}} - -## [Unreleased] - -{{ if .Unreleased.CommitGroups -}} -{{ range .Unreleased.CommitGroups -}} -### {{ .Title }} -{{ range .Commits -}} -- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }} -{{ end }} -{{ end -}} -{{ end -}} -{{ end -}} - -{{ range .Versions }} - -## {{ if .Tag.Previous }}[{{ .Tag.Name }}]{{ else }}{{ .Tag.Name }}{{ end }} - {{ datetime "2006-01-02" .Tag.Date }} -{{ range .CommitGroups -}} -### {{ .Title }} -{{ range .Commits -}} -- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }} -{{ end }} -{{ end -}} - -{{- if .RevertCommits -}} -### Reverts -{{ range .RevertCommits -}} -- {{ .Revert.Header }} -{{ end }} -{{ end -}} - -{{- if .MergeCommits -}} -### Pull Requests -{{ range .MergeCommits -}} -- {{ .Header }} -{{ end }} -{{ end -}} - -{{- if .NoteGroups -}} -{{ range .NoteGroups -}} -### {{ .Title }} -{{ range .Notes }} -{{ .Body }} -{{ end }} -{{ end -}} -{{ end -}} -{{ end -}} - -{{- if .Versions }} -[Unreleased]: {{ .Info.RepositoryURL }}/compare/{{ $latest := index .Versions 0 }}{{ $latest.Tag.Name }}...HEAD -{{ range .Versions -}} -{{ if .Tag.Previous -}} -[{{ .Tag.Name }}]: {{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }} -{{ end -}} -{{ end -}} -{{ end -}} \ No newline at end of file diff --git a/CHANGELOG/.chglog/config.yml b/CHANGELOG/.chglog/config.yml deleted file mode 100644 index 2a45bc26a..000000000 --- a/CHANGELOG/.chglog/config.yml +++ /dev/null @@ -1,67 +0,0 @@ -bin: git -style: github -template: CHANGELOG.tpl.md -info: - title: CHANGELOG - repository_url: https://github.com/openimsdk/open-im-server -options: - tag_filter_pattern: '^v' - sort: "date" - - commits: - filters: - Type: - - feat - - fix - - perf - - refactor - - docs - - test - - chore - - ci - - build - sort_by: Scope - - commit_groups: - group_by: Type - sort_by: Title - title_order: - - feat - - fix - - perf - - refactor - - docs - - test - - chore - - ci - - build - title_maps: - feat: Features - - header: - pattern: "" - pattern_maps: - - PropName - - issues: - prefix: - - # - - refs: - actions: - - Closes - - Fixes - - merges: - pattern: "^Merge branch '(\\w+)'$" - pattern_maps: - - Source - - reverts: - pattern: "^Revert \"([\\s\\S]*)\"$" - pattern_maps: - - Header - - notes: - keywords: - - BREAKING CHANGE \ No newline at end of file diff --git a/build/goreleaser.yaml b/build/goreleaser.yaml index 93fe9f4c8..c24fb65da 100644 --- a/build/goreleaser.yaml +++ b/build/goreleaser.yaml @@ -53,15 +53,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-cmdutils id: openim-cmdutils @@ -71,15 +64,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-crontask id: openim-crontask @@ -89,15 +75,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-msggateway id: openim-msggateway @@ -107,15 +86,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-msgtransfer id: openim-msgtransfer @@ -125,15 +97,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-push id: openim-push @@ -143,15 +108,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-auth id: openim-rpc-auth @@ -161,15 +119,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-conversation id: openim-rpc-conversation @@ -179,15 +130,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-friend id: openim-rpc-friend @@ -197,15 +141,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-group id: openim-rpc-group @@ -215,15 +152,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-msg id: openim-rpc-msg @@ -233,15 +163,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-third id: openim-rpc-third @@ -251,15 +174,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" - binary: openim-rpc-user id: openim-rpc-user @@ -269,15 +185,8 @@ builds: - windows - linux goarch: - - s390x - - mips64 - - mips64le - amd64 - - ppc64le - arm64 - goarm: - - "6" - - "7" # TODO:Need a script, such as the init - release to help binary to find the right directory diff --git a/config/discovery.yml b/config/discovery.yml index 78a36f3d1..6e68cbff4 100644 --- a/config/discovery.yml +++ b/config/discovery.yml @@ -5,9 +5,4 @@ etcd: username: '' password: '' -zookeeper: - schema: openim - address: [ localhost:12181 ] - username: '' - password: '' diff --git a/config/share.yml b/config/share.yml index 5f8521eaa..7d977ae15 100644 --- a/config/share.yml +++ b/config/share.yml @@ -13,4 +13,17 @@ rpcRegisterName: imAdminUserID: [ imAdmin ] # 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time -multiLoginPolicy: 1 +multiLogin: + policy: 1 + maxNumOneEnd: 30 + customizeLoginNum: + ios: 1 + android: 1 + windows: 1 + osx: 1 + web: 1 + miniWeb: 1 + linux: 1 + aPad: 1 + iPad: 1 + admin: 1 diff --git a/docker-compose.yml b/docker-compose.yml index edac65b13..6d88bac10 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,19 +43,6 @@ services: networks: - openim - zookeeper: - image: "${ZOOKEEPER_IMAGE}" - container_name: zookeeper - ports: - - "12181:2181" - environment: - #JVMFLAGS: "-Xms32m -Xmx128m" - TZ: "Asia/Shanghai" - ALLOW_ANONYMOUS_LOGIN: "yes" - restart: always - networks: - - openim - etcd: image: "${ETCD_IMAGE}" container_name: etcd @@ -142,6 +129,7 @@ services: # image: ${PROMETHEUS_IMAGE} # container_name: prometheus # restart: always +# user: root # volumes: # - ./config/prometheus.yml:/etc/prometheus/prometheus.yml # - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml diff --git a/go.mod b/go.mod index 09c626bc7..b982bc7d0 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72 + github.com/openimsdk/protocol v0.0.72-alpha.46 github.com/openimsdk/tools v0.0.50-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 00ecc7ed7..df9cf5194 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72 h1:K+vslwaR7lDXyBzb07UuEQITaqsgighz7NyXVIWsu6A= -github.com/openimsdk/protocol v0.0.72/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc= +github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/msggateway/online.go b/internal/msggateway/online.go index 27b4544aa..f29869b6e 100644 --- a/internal/msggateway/online.go +++ b/internal/msggateway/online.go @@ -90,6 +90,19 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) { if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil { log.ZError(ctx, "update user online status", err) } + for _, ss := range req.Status { + for _, online := range ss.Online { + client, _, _ := ws.clients.Get(ss.UserID, int(online)) + back := false + if len(client) > 0 { + back = client[0].IsBackground + } + ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, ss.UserID, int(online), back, ss.ConnID) + } + for _, offline := range ss.Offline { + ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, ss.UserID, int(offline), ss.ConnID) + } + } } for i := 0; i < concurrent; i++ { diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 7df297488..b92d7eb44 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package msggateway import ( @@ -212,7 +198,6 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C if err != nil { return err } - wg := errgroup.Group{} wg.SetLimit(concurrentRequest) @@ -321,8 +306,32 @@ func (ws *WsServer) KickUserConn(client *Client) error { } func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) { - switch ws.msgGatewayConfig.Share.MultiLoginPolicy { + kickTokenFunc := func(kickClients []*Client) { + var kickTokens []string + ws.clients.DeleteClients(newClient.UserID, kickClients) + for _, c := range kickClients { + kickTokens = append(kickTokens, c.token) + err := c.KickOnlineMessage() + if err != nil { + log.ZWarn(c.ctx, "KickOnlineMessage", err) + } + } + ctx := mcontext.WithMustInfoCtx( + []string{newClient.ctx.GetOperationID(), newClient.ctx.GetUserID(), + constant.PlatformIDToName(newClient.PlatformID), newClient.ctx.GetConnID()}, + ) + if _, err := ws.authClient.KickTokens(ctx, kickTokens); err != nil { + log.ZWarn(newClient.ctx, "kickTokens err", err) + } + } + + switch ws.msgGatewayConfig.Share.MultiLogin.Policy { case constant.DefalutNotKick: + case constant.WebAndOther: + if constant.PlatformIDToClass(newClient.PlatformID) == constant.WebPlatformStr { + return + } + fallthrough case constant.PCAndOther: if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { return @@ -347,6 +356,35 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien log.ZWarn(newClient.ctx, "InvalidateToken err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) } + case constant.PcMobileAndWeb: + clients, ok := ws.clients.GetAll(newClient.UserID) + if !ok { + return + } + var ( + kickClients []*Client + ) + for _, client := range clients { + if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) { + kickClients = append(kickClients, client) + } + } + kickTokenFunc(kickClients) + + case constant.SingleTerminalLogin: + clients, ok := ws.clients.GetAll(newClient.UserID) + if !ok { + return + } + var ( + kickClients []*Client + ) + for _, client := range clients { + kickClients = append(kickClients, client) + } + kickTokenFunc(kickClients) + case constant.Customize: + // todo } } diff --git a/internal/push/a_test.go b/internal/push/a_test.go deleted file mode 100644 index 8b2d86407..000000000 --- a/internal/push/a_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package push - -import ( - "github.com/openimsdk/protocol/sdkws" - "testing" -) - -func TestName(t *testing.T) { - var c ConsumerHandler - c.readCh = make(chan *sdkws.MarkAsReadTips) - - go c.loopRead() - - go func() { - for i := 0; ; i++ { - seq := int64(i + 1) - if seq%3 == 0 { - seq = 1 - } - c.readCh <- &sdkws.MarkAsReadTips{ - ConversationID: "c100", - MarkAsReadUserID: "u100", - HasReadSeq: seq, - } - } - }() - - select {} -} diff --git a/internal/push/offlinepush/offlinepusher.go b/internal/push/offlinepush/offlinepusher.go index 9aa6625de..d655a924a 100644 --- a/internal/push/offlinepush/offlinepusher.go +++ b/internal/push/offlinepush/offlinepusher.go @@ -23,10 +23,13 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "strings" ) const ( - geTUI = "geTui" + geTUI = "getui" firebase = "fcm" jPush = "jpush" ) @@ -38,6 +41,7 @@ type OfflinePusher interface { func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache, fcmConfigPath string) (OfflinePusher, error) { var offlinePusher OfflinePusher + pushConf.Enable = strings.ToLower(pushConf.Enable) switch pushConf.Enable { case geTUI: offlinePusher = getui.NewClient(pushConf, cache) @@ -47,6 +51,7 @@ func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache, fcmConfigPa offlinePusher = jpush.NewClient(pushConf) default: offlinePusher = dummy.NewClient() + log.ZWarn(mcontext.WithMustInfoCtx([]string{"push start", "admin", "admin", ""}), "Unknown push config", nil) } return offlinePusher, nil } diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index bf69aed3e..a80c147f4 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -55,6 +55,9 @@ func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, log.ZError(ctx, "offline push msg is empty", errs.New("offlinePushMsg is empty"), "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData) return } + if offlinePushMsg.MsgData.Status == constant.MsgStatusSending { + offlinePushMsg.MsgData.Status = constant.MsgStatusSendSuccess + } log.ZInfo(ctx, "receive to OfflinePush MQ", "userIDs", offlinePushMsg.UserIDs, "msg", offlinePushMsg.MsgData) err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 4ecf20de5..41ad5962a 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -194,6 +194,9 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat } func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { + if msg != nil && msg.Status == constant.MsgStatusSending { + msg.Status = constant.MsgStatusSendSuccess + } onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs) if err != nil { return nil, err diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 06ae89d97..62df74d21 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -65,7 +65,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, - config.Share.MultiLoginPolicy, + config.Share.MultiLogin, ), config: config, }) @@ -230,3 +230,10 @@ func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.Invalidate } return &pbauth.InvalidateTokenResp{}, nil } + +func (s *authServer) KickTokens(ctx context.Context, req *pbauth.KickTokensReq) (*pbauth.KickTokensResp, error) { + if err := s.authDatabase.BatchSetTokenMapByUidPid(ctx, req.Tokens); err != nil { + return nil, err + } + return &pbauth.KickTokensResp{}, nil +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 80d1c9b2f..ef917d539 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -465,7 +465,7 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite return nil, err } - if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InvitedUserIDs...); err != nil { + if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, opUserID, req.InvitedUserIDs...); err != nil { return nil, err } return &pbgroup.InviteUserToGroupResp{}, nil @@ -1180,36 +1180,53 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans if err != nil { return nil, err } + if group.Status == constant.GroupStatusDismissed { return nil, servererrs.ErrDismissedAlready.Wrap() } + if req.OldOwnerUserID == req.NewOwnerUserID { return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID") } + members, err := g.db.FindGroupMembers(ctx, req.GroupID, []string{req.OldOwnerUserID, req.NewOwnerUserID}) if err != nil { return nil, err } + if err := g.PopulateGroupMember(ctx, members...); err != nil { return nil, err } + memberMap := datautil.SliceToMap(members, func(e *model.GroupMember) string { return e.UserID }) if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 { return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ",")) } + oldOwner := memberMap[req.OldOwnerUserID] if oldOwner == nil { return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID not in group " + req.NewOwnerUserID) } + newOwner := memberMap[req.NewOwnerUserID] if newOwner == nil { return nil, errs.ErrArgs.WrapMsg("NewOwnerUser not in group " + req.NewOwnerUserID) } + if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) { if !(mcontext.GetOpUserID(ctx) == oldOwner.UserID && oldOwner.RoleLevel == constant.GroupOwner) { return nil, errs.ErrNoPermission.WrapMsg("no permission transfer group owner") } } + + if newOwner.MuteEndTime != time.Unix(0, 0) { + if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{ + GroupID: group.GroupID, + UserID: req.NewOwnerUserID}); err != nil { + return nil, err + } + } + if err := g.db.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil { return nil, err } @@ -1217,6 +1234,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans g.webhookAfterTransferGroupOwner(ctx, &g.config.WebhooksConfig.AfterTransferGroupOwner, req) g.notification.GroupOwnerTransferredNotification(ctx, req) + return &pbgroup.TransferGroupOwnerResp{}, nil } @@ -1425,32 +1443,38 @@ func (g *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbgroup.Ca if err != nil { return nil, err } + if err := g.PopulateGroupMember(ctx, member); err != nil { return nil, err } + if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) { opMember, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx)) if err != nil { return nil, err } + switch member.RoleLevel { case constant.GroupOwner: - return nil, errs.ErrNoPermission.WrapMsg("set group owner mute") + return nil, errs.ErrNoPermission.WrapMsg("Can not set group owner unmute") case constant.GroupAdmin: if opMember.RoleLevel != constant.GroupOwner { - return nil, errs.ErrNoPermission.WrapMsg("set group admin mute") + return nil, errs.ErrNoPermission.WrapMsg("Can not set group admin unmute") } case constant.GroupOrdinaryUsers: if !(opMember.RoleLevel == constant.GroupAdmin || opMember.RoleLevel == constant.GroupOwner) { - return nil, errs.ErrNoPermission.WrapMsg("set group ordinary users mute") + return nil, errs.ErrNoPermission.WrapMsg("Can not set group ordinary users unmute") } } } + data := UpdateGroupMemberMutedTimeMap(time.Unix(0, 0)) if err := g.db.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil { return nil, err } + g.notification.GroupMemberCancelMutedNotification(ctx, req.GroupID, req.UserID) + return &pbgroup.CancelMuteGroupMemberResp{}, nil } @@ -1485,9 +1509,6 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr return nil, errs.ErrNoPermission.WrapMsg("no op user id") } isAppManagerUid := authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) - for i := range req.Members { - req.Members[i].FaceURL = nil - } groupMembers := make(map[string][]*pbgroup.SetGroupMemberInfo) for i, member := range req.Members { if member.RoleLevel != nil { @@ -1529,29 +1550,61 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr case 0: if !isAppManagerUid { roleLevel := dbMembers[opUserIndex].RoleLevel - if roleLevel != constant.GroupOwner { - switch roleLevel { - case constant.GroupAdmin: - for _, member := range dbMembers { - if member.RoleLevel == constant.GroupOwner { - return nil, errs.ErrNoPermission.WrapMsg("admin can not change group owner") - } - if member.RoleLevel == constant.GroupAdmin && member.UserID != opUserID { - return nil, errs.ErrNoPermission.WrapMsg("admin can not change other group admin") - } + var ( + dbSelf = &model.GroupMember{} + reqSelf *pbgroup.SetGroupMemberInfo + ) + switch roleLevel { + case constant.GroupOwner: + for _, member := range dbMembers { + if member.UserID == opUserID { + dbSelf = member + break } - case constant.GroupOrdinaryUsers: - for _, member := range dbMembers { - if !(member.RoleLevel == constant.GroupOrdinaryUsers && member.UserID == opUserID) { - return nil, errs.ErrNoPermission.WrapMsg("ordinary users can not change other role level") - } + } + case constant.GroupAdmin: + for _, member := range dbMembers { + if member.UserID == opUserID { + dbSelf = member } - default: - for _, member := range dbMembers { - if member.RoleLevel >= roleLevel { - return nil, errs.ErrNoPermission.WrapMsg("can not change higher role level") - } + if member.RoleLevel == constant.GroupOwner { + return nil, errs.ErrNoPermission.WrapMsg("admin can not change group owner") } + if member.RoleLevel == constant.GroupAdmin && member.UserID != opUserID { + return nil, errs.ErrNoPermission.WrapMsg("admin can not change other group admin") + } + } + case constant.GroupOrdinaryUsers: + for _, member := range dbMembers { + if member.UserID == opUserID { + dbSelf = member + } + if !(member.RoleLevel == constant.GroupOrdinaryUsers && member.UserID == opUserID) { + return nil, errs.ErrNoPermission.WrapMsg("ordinary users can not change other role level") + } + } + default: + for _, member := range dbMembers { + if member.UserID == opUserID { + dbSelf = member + } + if member.RoleLevel >= roleLevel { + return nil, errs.ErrNoPermission.WrapMsg("can not change higher role level") + } + } + } + for _, member := range req.Members { + if member.UserID == opUserID { + reqSelf = member + break + } + } + if reqSelf != nil && reqSelf.RoleLevel != nil { + if reqSelf.RoleLevel.GetValue() > dbSelf.RoleLevel { + return nil, errs.ErrNoPermission.WrapMsg("can not improve role level by self") + } + if roleLevel == constant.GroupOwner { + return nil, errs.ErrArgs.WrapMsg("group owner can not change own role level") // Prevent the absence of a group owner } } } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 64e922fe2..54a6146f5 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -38,6 +38,7 @@ import ( "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/stringutil" "go.mongodb.org/mongo-driver/mongo" + "time" ) // GroupApplicationReceiver @@ -572,8 +573,51 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c return nil } -func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error { - return g.GroupApplicationAgreeMemberEnterNotification(ctx, groupID, "", entrantUserID...) +func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) error { + var err error + defer func() { + if err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) + } + }() + + if !g.config.RpcConfig.EnableHistoryForNewMembers { + conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) + maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) + if err != nil { + return err + } + if _, err = g.msgRpcClient.SetUserConversationsMinSeq(ctx, &msg.SetUserConversationsMinSeqReq{ + UserIDs: []string{entrantUserID}, + ConversationID: conversationID, + Seq: maxSeq, + }); err != nil { + return err + } + } + + if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, []string{entrantUserID}); err != nil { + return err + } + + var group *sdkws.GroupInfo + group, err = g.getGroupInfo(ctx, groupID) + if err != nil { + return err + } + user, err := g.getGroupMember(ctx, groupID, entrantUserID) + if err != nil { + return err + } + + tips := &sdkws.MemberEnterTips{ + Group: group, + EntrantUser: user, + OperationTime: time.Now().UnixMilli(), + } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips) + return nil } func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index a16ca5665..2c3f8c0a3 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -29,7 +29,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/stringutil" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { @@ -80,13 +79,17 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq) func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) { log.ZDebug(nctx, "setConversationAtInfo", "msg", msg) + ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) + var atUserID []string + conversation := &pbconversation.ConversationReq{ ConversationID: msgprocessor.GetConversationIDByMsg(msg), ConversationType: msg.SessionType, GroupID: msg.GroupID, } + tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...) if tagAll { memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID) @@ -94,25 +97,35 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa log.ZWarn(ctx, "GetGroupMemberIDs", err) return } - atUserID = stringutil.DifferenceString([]string{constant.AtAllString}, msg.AtUserIDList) + + memberUserIDList = datautil.DeleteElems(memberUserIDList, msg.SendID) + + atUserID = datautil.Single([]string{constant.AtAllString}, msg.AtUserIDList) + if len(atUserID) == 0 { // just @everyone conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} } else { // @Everyone and @other people conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} + err = m.Conversation.SetConversations(ctx, atUserID, conversation) if err != nil { log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation) } - memberUserIDList = stringutil.DifferenceString(atUserID, memberUserIDList) + + memberUserIDList = datautil.Single(atUserID, memberUserIDList) } + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} + err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation) if err != nil { log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation) } + return } conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} + err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation) if err != nil { log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 4669ed513..b47c516d9 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -116,18 +116,17 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) { resp = &pbuser.GetDesignateUsersResp{} - users, err := s.db.FindWithError(ctx, req.UserIDs) + users, err := s.db.Find(ctx, req.UserIDs) if err != nil { return nil, err } + resp.UsersInfo = convert.UsersDB2Pb(users) return resp, nil } // deprecated: - -//UpdateUserInfo - +// UpdateUserInfo func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) { resp = &pbuser.UpdateUserInfoResp{} err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 77fcbb8aa..da6c63d60 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -361,11 +361,29 @@ type AfterConfig struct { } type Share struct { - Secret string `mapstructure:"secret"` - RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` - IMAdminUserID []string `mapstructure:"imAdminUserID"` - MultiLoginPolicy int `mapstructure:"multiLoginPolicy"` + Secret string `mapstructure:"secret"` + RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` + IMAdminUserID []string `mapstructure:"imAdminUserID"` + MultiLogin MultiLogin `mapstructure:"multiLogin"` } + +type MultiLogin struct { + Policy int `mapstructure:"policy"` + MaxNumOneEnd int `mapstructure:"maxNumOneEnd"` + CustomizeLoginNum struct { + IOS int `mapstructure:"ios"` + Android int `mapstructure:"android"` + Windows int `mapstructure:"windows"` + OSX int `mapstructure:"osx"` + Web int `mapstructure:"web"` + MiniWeb int `mapstructure:"miniWeb"` + Linux int `mapstructure:"linux"` + APad int `mapstructure:"aPad"` + IPad int `mapstructure:"iPad"` + Admin int `mapstructure:"admin"` + } `mapstructure:"customizeLoginNum"` +} + type RpcRegisterName struct { User string `mapstructure:"user"` Friend string `mapstructure:"friend"` diff --git a/pkg/common/storage/cache/cachekey/token.go b/pkg/common/storage/cache/cachekey/token.go index 94468dc31..83ba2f211 100644 --- a/pkg/common/storage/cache/cachekey/token.go +++ b/pkg/common/storage/cache/cachekey/token.go @@ -1,6 +1,9 @@ package cachekey -import "github.com/openimsdk/protocol/constant" +import ( + "github.com/openimsdk/protocol/constant" + "strings" +) const ( UidPidToken = "UID_PID_TOKEN_STATUS:" @@ -9,3 +12,17 @@ const ( func GetTokenKey(userID string, platformID int) string { return UidPidToken + userID + ":" + constant.PlatformIDToName(platformID) } + +func GetAllPlatformTokenKey(userID string) []string { + res := make([]string, len(constant.PlatformID2Name)) + for k := range constant.PlatformID2Name { + res[k-1] = GetTokenKey(userID, k) + } + return res +} + +func GetPlatformIDByTokenKey(key string) int { + splitKey := strings.Split(key, ":") + platform := splitKey[len(splitKey)-1] + return constant.PlatformNameToID(platform) +} diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 24e9c3005..998b4f1c9 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -1,17 +1,3 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package redis import ( @@ -21,6 +7,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" "strconv" + "sync" "time" ) @@ -67,6 +54,43 @@ func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, p return mm, nil } +func (c *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) { + var ( + res = make(map[int]map[string]int) + resLock = sync.Mutex{} + ) + + keys := cachekey.GetAllPlatformTokenKey(userID) + if err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { + pipe := c.rdb.Pipeline() + mapRes := make([]*redis.MapStringStringCmd, len(keys)) + for i, key := range keys { + mapRes[i] = pipe.HGetAll(ctx, key) + } + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + for i, m := range mapRes { + mm := make(map[string]int) + for k, v := range m.Val() { + state, err := strconv.Atoi(v) + if err != nil { + return errs.WrapMsg(err, "redis token value is not int", "value", v, "userID", userID) + } + mm[k] = state + } + resLock.Lock() + res[cachekey.GetPlatformIDByTokenKey(keys[i])] = mm + resLock.Unlock() + } + return nil + }); err != nil { + return nil, err + } + return res, nil +} + func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error { mm := make(map[string]any) for k, v := range m { @@ -75,6 +99,18 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), mm).Err()) } +func (c *tokenCache) BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]int) error { + pipe := c.rdb.Pipeline() + for k, v := range tokens { + pipe.HSet(ctx, k, v) + } + _, err := pipe.Exec(ctx) + if err != nil { + return errs.Wrap(err) + } + return nil +} + func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error { return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err()) } diff --git a/pkg/common/storage/cache/token.go b/pkg/common/storage/cache/token.go index 4a0fee087..ee0004d7f 100644 --- a/pkg/common/storage/cache/token.go +++ b/pkg/common/storage/cache/token.go @@ -9,6 +9,8 @@ type TokenModel interface { // SetTokenFlagEx set token and flag with expire time SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) + GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error + BatchSetTokenMapByUidPid(ctx context.Context, tokens map[string]map[string]int) error DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index 94f18b3ae..de8f93462 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -1,21 +1,9 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/log" "github.com/golang-jwt/jwt/v4" @@ -32,18 +20,41 @@ type AuthDatabase interface { // Create token CreateToken(ctx context.Context, userID string, platformID int) (string, error) + BatchSetTokenMapByUidPid(ctx context.Context, tokens []string) error + SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error } -type authDatabase struct { - cache cache.TokenModel - accessSecret string - accessExpire int64 - multiLoginPolicy int +type multiLoginConfig struct { + Policy int + MaxNumOneEnd int + CustomizeLoginNum map[int]int } -func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64, policy int) AuthDatabase { - return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLoginPolicy: policy} +type authDatabase struct { + cache cache.TokenModel + accessSecret string + accessExpire int64 + multiLogin multiLoginConfig +} + +func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64, multiLogin config.MultiLogin) AuthDatabase { + return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLogin: multiLoginConfig{ + Policy: multiLogin.Policy, + MaxNumOneEnd: multiLogin.MaxNumOneEnd, + CustomizeLoginNum: map[int]int{ + constant.IOSPlatformID: multiLogin.CustomizeLoginNum.IOS, + constant.AndroidPlatformID: multiLogin.CustomizeLoginNum.Android, + constant.WindowsPlatformID: multiLogin.CustomizeLoginNum.Windows, + constant.OSXPlatformID: multiLogin.CustomizeLoginNum.OSX, + constant.WebPlatformID: multiLogin.CustomizeLoginNum.Web, + constant.MiniWebPlatformID: multiLogin.CustomizeLoginNum.MiniWeb, + constant.LinuxPlatformID: multiLogin.CustomizeLoginNum.Linux, + constant.AndroidPadPlatformID: multiLogin.CustomizeLoginNum.APad, + constant.IPadPlatformID: multiLogin.CustomizeLoginNum.IPad, + constant.AdminPlatformID: multiLogin.CustomizeLoginNum.Admin, + }, + }} } // If the result is empty. @@ -55,22 +66,38 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p return a.cache.SetTokenMapByUidPid(ctx, userID, platformID, m) } +func (a *authDatabase) BatchSetTokenMapByUidPid(ctx context.Context, tokens []string) error { + setMap := make(map[string]map[string]int) + for _, token := range tokens { + claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(a.accessSecret)) + key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID) + if err != nil { + continue + } else { + if v, ok := setMap[key]; ok { + v[token] = constant.KickedToken + } else { + setMap[key] = map[string]int{ + token: constant.KickedToken, + } + } + } + } + if err := a.cache.BatchSetTokenMapByUidPid(ctx, setMap); err != nil { + return err + } + return nil +} + // Create Token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { - // todo: get all platform token - tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) + tokens, err := a.cache.GetAllTokensWithoutError(ctx, userID) if err != nil { return "", err } - var deleteTokenKey []string - var kickedTokenKey []string - for k, v := range tokens { - t, err := tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) - if err != nil || v != constant.NormalToken { - deleteTokenKey = append(deleteTokenKey, k) - } else if a.checkKickToken(ctx, platformID, t) { - kickedTokenKey = append(kickedTokenKey, k) - } + deleteTokenKey, kickedTokenKey, err := a.checkToken(ctx, tokens, platformID) + if err != nil { + return "", err } if len(deleteTokenKey) != 0 { err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) @@ -78,16 +105,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", err } } - - const adminTokenMaxNum = 30 - if platformID == constant.AdminPlatformID { - if len(kickedTokenKey) > adminTokenMaxNum { - kickedTokenKey = kickedTokenKey[:len(kickedTokenKey)-adminTokenMaxNum] - } else { - kickedTokenKey = nil - } - } - if len(kickedTokenKey) != 0 { for _, k := range kickedTokenKey { err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) @@ -111,22 +128,140 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return tokenString, nil } -func (a *authDatabase) checkKickToken(ctx context.Context, platformID int, token *tokenverify.Claims) bool { - switch a.multiLoginPolicy { - case constant.DefalutNotKick: - return false - case constant.PCAndOther: - if constant.PlatformIDToClass(platformID) == constant.TerminalPC || - constant.PlatformIDToClass(token.PlatformID) == constant.TerminalPC { - return false +func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string]int, platformID int) ([]string, []string, error) { + // todo: Move the logic for handling old data to another location. + var ( + loginTokenMap = make(map[int][]string) // The length of the value of the map must be greater than 0 + deleteToken = make([]string, 0) + kickToken = make([]string, 0) + adminToken = make([]string, 0) + unkickTerminal = "" + ) + + for plfID, tks := range tokens { + for k, v := range tks { + _, err := tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) + if err != nil || v != constant.NormalToken { + deleteToken = append(deleteToken, k) + } else { + if plfID != constant.AdminPlatformID { + loginTokenMap[plfID] = append(loginTokenMap[plfID], k) + } else { + adminToken = append(adminToken, k) + } + } } - return true - case constant.AllLoginButSameTermKick: - if platformID == token.PlatformID { - return true - } - return false - default: - return false } + + switch a.multiLogin.Policy { + case constant.DefalutNotKick: + for plt, ts := range loginTokenMap { + l := len(ts) + if platformID == plt { + l++ + } + limit := a.multiLogin.MaxNumOneEnd + if l > limit { + kickToken = append(kickToken, ts[:l-limit]...) + } + } + case constant.AllLoginButSameTermKick: + for plt, ts := range loginTokenMap { + kickToken = append(kickToken, ts[:len(ts)-1]...) + if plt == platformID { + kickToken = append(kickToken, ts[len(ts)-1]) + } + } + case constant.SingleTerminalLogin: + for _, ts := range loginTokenMap { + kickToken = append(kickToken, ts...) + } + case constant.WebAndOther: + unkickTerminal = constant.WebPlatformStr + fallthrough + case constant.PCAndOther: + if unkickTerminal == "" { + unkickTerminal = constant.TerminalPC + } + if constant.PlatformIDToClass(platformID) != unkickTerminal { + for plt, ts := range loginTokenMap { + if constant.PlatformIDToClass(plt) != unkickTerminal { + kickToken = append(kickToken, ts...) + } + } + } else { + var ( + preKick []string + isReserve = true + ) + for plt, ts := range loginTokenMap { + if constant.PlatformIDToClass(plt) != unkickTerminal { + // Keep a token from another end + if isReserve { + isReserve = false + kickToken = append(kickToken, ts[:len(ts)-1]...) + preKick = append(preKick, ts[len(ts)-1]) + continue + } else { + // Prioritize keeping Android + if plt == constant.AndroidPlatformID { + kickToken = append(kickToken, preKick...) + kickToken = append(kickToken, ts[:len(ts)-1]...) + } else { + kickToken = append(kickToken, ts...) + } + } + } + } + } + case constant.PcMobileAndWeb: + var ( + reserved = make(map[string]bool) + ) + + for plt, ts := range loginTokenMap { + if constant.PlatformIDToClass(plt) == constant.PlatformIDToClass(platformID) { + kickToken = append(kickToken, ts...) + } else { + if !reserved[constant.PlatformIDToClass(plt)] { + reserved[constant.PlatformIDToClass(plt)] = true + kickToken = append(kickToken, ts[:len(ts)-1]...) + continue + } else { + kickToken = append(kickToken, ts...) + } + } + } + + case constant.Customize: + if a.multiLogin.CustomizeLoginNum[platformID] <= 0 { + return nil, nil, errs.New("Do not allow login on this end").Wrap() + } + for plt, ts := range loginTokenMap { + l := len(ts) + if platformID == plt { + l++ + } + // a.multiLogin.CustomizeLoginNum[platformID] must > 0 + limit := min(a.multiLogin.CustomizeLoginNum[plt], a.multiLogin.MaxNumOneEnd) + if l > limit { + kickToken = append(kickToken, ts[:l-limit]...) + } + } + default: + return nil, nil, errs.New("unknown multiLogin policy").Wrap() + } + + var adminTokenMaxNum = a.multiLogin.MaxNumOneEnd + if a.multiLogin.Policy == constant.Customize { + adminTokenMaxNum = a.multiLogin.CustomizeLoginNum[constant.AdminPlatformID] + } + l := len(adminToken) + if platformID == constant.AdminPlatformID { + l++ + } + if l > adminTokenMaxNum { + kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) + } + return deleteToken, kickToken, nil } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 06a073365..f0b7d70db 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -16,9 +16,10 @@ package controller import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -194,7 +195,7 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() for _, conversation := range conversations { - cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID) + cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID, conversation.UserID) for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { ownerUserID := v[0] userID := v[1] diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 5e540a2c3..c6013dbc1 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -2,6 +2,7 @@ package controller import ( "context" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -83,6 +84,9 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, } } + if msg.Status == constant.MsgStatusSending { + msg.Status = constant.MsgStatusSendSuccess + } msgs[i] = &model.MsgDataModel{ SendID: msg.SendID, RecvID: msg.RecvID, @@ -254,7 +258,7 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { + if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil { return err } } diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go index 7c676f30a..79a768597 100644 --- a/pkg/rpccache/user.go +++ b/pkg/rpccache/user.go @@ -16,6 +16,7 @@ package rpccache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" @@ -97,6 +98,7 @@ func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([] user, err := u.GetUserInfo(ctx, userID) if err != nil { if errs.ErrRecordNotFound.Is(err) { + log.ZWarn(ctx, "User info notFound", err, "userID", userID) continue } return nil, err diff --git a/pkg/rpcclient/auth.go b/pkg/rpcclient/auth.go index 6665936bd..05fec35a0 100644 --- a/pkg/rpcclient/auth.go +++ b/pkg/rpcclient/auth.go @@ -16,8 +16,8 @@ package rpcclient import ( "context" + "github.com/openimsdk/protocol/auth" - pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/system/program" "google.golang.org/grpc" @@ -38,8 +38,8 @@ type Auth struct { discov discovery.SvcDiscoveryRegistry } -func (a *Auth) ParseToken(ctx context.Context, token string) (*pbAuth.ParseTokenResp, error) { - req := pbAuth.ParseTokenReq{ +func (a *Auth) ParseToken(ctx context.Context, token string) (*auth.ParseTokenResp, error) { + req := auth.ParseTokenReq{ Token: token, } resp, err := a.Client.ParseToken(ctx, &req) @@ -49,8 +49,8 @@ func (a *Auth) ParseToken(ctx context.Context, token string) (*pbAuth.ParseToken return resp, err } -func (a *Auth) InvalidateToken(ctx context.Context, preservedToken, userID string, platformID int) (*pbAuth.InvalidateTokenResp, error) { - req := pbAuth.InvalidateTokenReq{ +func (a *Auth) InvalidateToken(ctx context.Context, preservedToken, userID string, platformID int) (*auth.InvalidateTokenResp, error) { + req := auth.InvalidateTokenReq{ PreservedToken: preservedToken, UserID: userID, PlatformID: int32(platformID), @@ -61,3 +61,14 @@ func (a *Auth) InvalidateToken(ctx context.Context, preservedToken, userID strin } return resp, err } + +func (a *Auth) KickTokens(ctx context.Context, tokens []string) (*auth.KickTokensResp, error) { + req := auth.KickTokensReq{ + Tokens: tokens, + } + resp, err := a.Client.KickTokens(ctx, &req) + if err != nil { + return nil, err + } + return resp, err +}