mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
refactor: refactor the code of push and optimization.
This commit is contained in:
parent
c7067bc48a
commit
9660556f35
2
go.mod
2
go.mod
@ -4,7 +4,7 @@ go 1.19
|
||||
|
||||
require (
|
||||
firebase.google.com/go v3.13.0+incompatible
|
||||
github.com/OpenIMSDK/protocol v0.0.47
|
||||
github.com/OpenIMSDK/protocol v0.0.49
|
||||
github.com/OpenIMSDK/tools v0.0.27
|
||||
github.com/bwmarrin/snowflake v0.3.0 // indirect
|
||||
github.com/dtm-labs/rockscache v0.1.1
|
||||
|
12
go.sum
12
go.sum
@ -15,13 +15,11 @@ cloud.google.com/go/storage v1.30.1 h1:uOdMxAs8HExqBlnLtnQyP0YkvbiDpdGShGKtx6U/o
|
||||
cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E=
|
||||
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
|
||||
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
|
||||
github.com/AndrewZuo01/protocol v0.0.0-20240112093520-fd9c53e27b94 h1:o86vkek41ZrQqoBGqyKvS0z6N0uJj64mpzK72OkDZVM=
|
||||
github.com/AndrewZuo01/protocol v0.0.0-20240112093520-fd9c53e27b94/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
|
||||
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
|
||||
github.com/OpenIMSDK/tools v0.0.23 h1:xozfrGzhbpNPlDTap5DLVPk+JfgZ/ZyIj4Cuu3/bm9w=
|
||||
github.com/OpenIMSDK/tools v0.0.23/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
|
||||
github.com/OpenIMSDK/protocol v0.0.49 h1:wcqJOMBis7f153zNI7V82Fc4WyqA1GanMgXUQgL618k=
|
||||
github.com/OpenIMSDK/protocol v0.0.49/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
|
||||
github.com/OpenIMSDK/tools v0.0.27 h1:J/kSRqM+y9U4XK/pQ9RkEB31oQ5BTYD1oA5r1PITPRA=
|
||||
github.com/OpenIMSDK/tools v0.0.27/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
|
||||
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
|
||||
@ -93,7 +91,6 @@ github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
|
||||
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
|
||||
github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0=
|
||||
github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
|
||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
|
||||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
@ -203,10 +200,8 @@ github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
|
||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
|
||||
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
|
||||
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v2wtGp9Gmz1Ze3eVRAWJMLokvN3QjdzCHLY=
|
||||
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
@ -259,7 +254,6 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
|
||||
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
|
||||
github.com/mozillazg/go-httpheader v0.4.0 h1:aBn6aRXtFzyDLZ4VIRLsZbbJloagQfMnCiYgOq6hK4w=
|
||||
github.com/mozillazg/go-httpheader v0.4.0/go.mod h1:PuT8h0pw6efvp8ZeUec1Rs7dwjK08bt6gKSReGMqtdA=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
@ -390,7 +384,6 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@ -475,7 +468,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
|
||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -25,8 +25,6 @@ import (
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
@ -59,7 +57,7 @@ type Server struct {
|
||||
rpcPort int
|
||||
prometheusPort int
|
||||
LongConnServer LongConnServer
|
||||
pushTerminal []int
|
||||
pushTerminal map[int]struct{}
|
||||
}
|
||||
|
||||
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
|
||||
@ -67,12 +65,15 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
|
||||
}
|
||||
|
||||
func NewServer(rpcPort int, proPort int, longConnServer LongConnServer) *Server {
|
||||
return &Server{
|
||||
s := &Server{
|
||||
rpcPort: rpcPort,
|
||||
prometheusPort: proPort,
|
||||
LongConnServer: longConnServer,
|
||||
pushTerminal: []int{constant.IOSPlatformID, constant.AndroidPlatformID},
|
||||
pushTerminal: make(map[int]struct{}),
|
||||
}
|
||||
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
|
||||
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) OnlinePushMsg(
|
||||
@ -126,13 +127,9 @@ func (s *Server) OnlineBatchPushOneMsg(
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *Server) SuperGroupOnlineBatchPushOneMsg(
|
||||
ctx context.Context,
|
||||
req *msggateway.OnlineBatchPushOneMsgReq,
|
||||
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
|
||||
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
|
||||
|
||||
var singleUserResults []*msggateway.SingleMsgToUserResults
|
||||
|
||||
for _, v := range req.PushToUserIDs {
|
||||
var resp []*msggateway.SingleMsgToUserPlatform
|
||||
results := &msggateway.SingleMsgToUserResults{
|
||||
@ -153,23 +150,22 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(
|
||||
}
|
||||
|
||||
userPlatform := &msggateway.SingleMsgToUserPlatform{
|
||||
RecvID: v,
|
||||
RecvPlatFormID: int32(client.PlatformID),
|
||||
PlatFormID: int32(client.PlatformID),
|
||||
}
|
||||
if !client.IsBackground ||
|
||||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
||||
err := client.PushMessage(ctx, req.MsgData)
|
||||
if err != nil {
|
||||
userPlatform.ResultCode = -2
|
||||
userPlatform.ResultCode = int64(errs.ErrPushMsgErr.Code())
|
||||
resp = append(resp, userPlatform)
|
||||
} else {
|
||||
if utils.IsContainInt(client.PlatformID, s.pushTerminal) {
|
||||
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
||||
results.OnlinePush = true
|
||||
resp = append(resp, userPlatform)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
userPlatform.ResultCode = -3
|
||||
userPlatform.ResultCode = int64(errs.ErrIOSBackgroundPushErr.Code())
|
||||
resp = append(resp, userPlatform)
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
@ -136,3 +137,14 @@ func callbackBeforeSuperGroupOnlinePush(
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func GetContent(msg *sdkws.MsgData) string {
|
||||
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
|
||||
var notification sdkws.NotificationElem
|
||||
if err := json.Unmarshal(msg.Content, ¬ification); err != nil {
|
||||
return ""
|
||||
}
|
||||
return notification.Detail
|
||||
} else {
|
||||
return string(msg.Content)
|
||||
}
|
||||
}
|
||||
|
@ -1,32 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
type Consumer struct {
|
||||
pushCh ConsumerHandler
|
||||
successCount uint64
|
||||
}
|
||||
|
||||
func NewConsumer(pusher *Pusher) *Consumer {
|
||||
return &Consumer{
|
||||
pushCh: *NewConsumerHandler(pusher),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Consumer) Start() {
|
||||
// statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to
|
||||
// msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
|
||||
go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&c.pushCh)
|
||||
}
|
@ -20,7 +20,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
)
|
||||
|
||||
func NewClient() *Dummy {
|
||||
func NewDummy() *Dummy {
|
||||
return &Dummy{}
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ type Fcm struct {
|
||||
cache cache.MsgModel
|
||||
}
|
||||
|
||||
func NewClient(cache cache.MsgModel) *Fcm {
|
||||
func NewFcm(cache cache.MsgModel) *Fcm {
|
||||
projectRoot := config.GetProjectRoot()
|
||||
credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount)
|
||||
opt := option.WithCredentialsFile(credentialsFilePath)
|
||||
|
@ -55,17 +55,17 @@ const (
|
||||
taskIDTTL = 1000 * 60 * 60 * 24
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
type GeTui struct {
|
||||
cache cache.MsgModel
|
||||
tokenExpireTime int64
|
||||
taskIDTTL int64
|
||||
}
|
||||
|
||||
func NewClient(cache cache.MsgModel) *Client {
|
||||
return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
|
||||
func NewGeTui(cache cache.MsgModel) *GeTui {
|
||||
return &GeTui{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL}
|
||||
}
|
||||
|
||||
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error {
|
||||
func (g *GeTui) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error {
|
||||
token, err := g.cache.GetGetuiToken(ctx)
|
||||
if err != nil {
|
||||
if errs.Unwrap(err) == redis.Nil {
|
||||
@ -111,7 +111,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
|
||||
func (g *GeTui) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) {
|
||||
h := sha256.New()
|
||||
h.Write(
|
||||
[]byte(config.Config.Push.GeTui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.GeTui.MasterSecret),
|
||||
@ -131,7 +131,7 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir
|
||||
return respAuth.Token, int64(expire), err
|
||||
}
|
||||
|
||||
func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) {
|
||||
func (g *GeTui) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) {
|
||||
respTask := TaskResp{}
|
||||
ttl := int64(1000 * 60 * 5)
|
||||
pushReq.Settings = &Settings{TTL: &ttl}
|
||||
@ -143,7 +143,7 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (
|
||||
}
|
||||
|
||||
// max num is 999.
|
||||
func (g *Client) batchPush(ctx context.Context, token string, userIDs []string, pushReq PushReq) error {
|
||||
func (g *GeTui) batchPush(ctx context.Context, token string, userIDs []string, pushReq PushReq) error {
|
||||
taskID, err := g.GetTaskID(ctx, token, pushReq)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -152,21 +152,21 @@ func (g *Client) batchPush(ctx context.Context, token string, userIDs []string,
|
||||
return g.request(ctx, batchPushURL, pushReq, token, nil)
|
||||
}
|
||||
|
||||
func (g *Client) singlePush(ctx context.Context, token, userID string, pushReq PushReq) error {
|
||||
func (g *GeTui) singlePush(ctx context.Context, token, userID string, pushReq PushReq) error {
|
||||
operationID := mcontext.GetOperationID(ctx)
|
||||
pushReq.RequestID = &operationID
|
||||
pushReq.Audience = &Audience{Alias: []string{userID}}
|
||||
return g.request(ctx, pushURL, pushReq, token, nil)
|
||||
}
|
||||
|
||||
func (g *Client) request(ctx context.Context, url string, input any, token string, output any) error {
|
||||
func (g *GeTui) request(ctx context.Context, url string, input any, token string, output any) error {
|
||||
header := map[string]string{"token": token}
|
||||
resp := &Resp{}
|
||||
resp.Data = output
|
||||
return g.postReturn(ctx, config.Config.Push.GeTui.PushUrl+url, header, input, resp, 3)
|
||||
}
|
||||
|
||||
func (g *Client) postReturn(
|
||||
func (g *GeTui) postReturn(
|
||||
ctx context.Context,
|
||||
url string,
|
||||
header map[string]string,
|
||||
@ -181,7 +181,7 @@ func (g *Client) postReturn(
|
||||
return output.parseError()
|
||||
}
|
||||
|
||||
func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err error) {
|
||||
func (g *GeTui) getTokenAndSave2Redis(ctx context.Context) (token string, err error) {
|
||||
token, _, err = g.Auth(ctx, time.Now().UnixNano()/1e6)
|
||||
if err != nil {
|
||||
return
|
||||
@ -193,7 +193,7 @@ func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err e
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
|
||||
func (g *GeTui) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
|
||||
pushReq.Settings = &Settings{TTL: &g.taskIDTTL}
|
||||
taskID, err = g.GetTaskID(ctx, token, pushReq)
|
||||
if err != nil {
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
type JPush struct{}
|
||||
|
||||
func NewClient() *JPush {
|
||||
func NewJPush() *JPush {
|
||||
return &JPush{}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,18 @@ package offlinepush
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
GETUI = "getui"
|
||||
FIREBASE = "fcm"
|
||||
JPUSH = "jpush"
|
||||
)
|
||||
|
||||
// OfflinePusher Offline Pusher.
|
||||
@ -23,6 +35,21 @@ type OfflinePusher interface {
|
||||
Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error
|
||||
}
|
||||
|
||||
func NewOfflinePusher(cache cache.MsgModel) OfflinePusher {
|
||||
var offlinePusher OfflinePusher
|
||||
switch config.Config.Push.Enable {
|
||||
case GETUI:
|
||||
offlinePusher = getui.NewGeTui(cache)
|
||||
case FIREBASE:
|
||||
offlinePusher = fcm.NewFcm(cache)
|
||||
case JPUSH:
|
||||
offlinePusher = jpush.NewJPush()
|
||||
default:
|
||||
offlinePusher = dummy.NewDummy()
|
||||
}
|
||||
return offlinePusher
|
||||
}
|
||||
|
||||
// Opts opts.
|
||||
type Opts struct {
|
||||
Signal *Signal
|
211
internal/push/onlinepusher.go
Normal file
211
internal/push/onlinepusher.go
Normal file
@ -0,0 +1,211 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/protocol/msggateway"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
ENVNAME = "ENVS_DISCOVERY"
|
||||
KUBERNETES = "k8s"
|
||||
ZOOKEEPER = "zookeeper"
|
||||
)
|
||||
|
||||
type OnlinePusher interface {
|
||||
GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error)
|
||||
GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults,
|
||||
pushToUserIDs *[]string) []string
|
||||
}
|
||||
|
||||
type emptyOnlinePUsher struct{}
|
||||
|
||||
func newEmptyOnlinePUsher() *emptyOnlinePUsher {
|
||||
return &emptyOnlinePUsher{}
|
||||
}
|
||||
|
||||
func (emptyOnlinePUsher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
log.ZWarn(ctx, "emptyOnlinePUsher GetConnsAndOnlinePush", nil)
|
||||
return nil, nil
|
||||
}
|
||||
func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
|
||||
wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
|
||||
log.ZWarn(ctx, "emptyOnlinePUsher GetOnlinePushFailedUserIDs", nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewOnlinePusher(disCov discoveryregistry.SvcDiscoveryRegistry) OnlinePusher {
|
||||
var envType string
|
||||
if value := os.Getenv(ENVNAME); value != "" {
|
||||
envType = os.Getenv(ENVNAME)
|
||||
} else {
|
||||
envType = config.Config.Envs.Discovery
|
||||
}
|
||||
switch envType {
|
||||
case KUBERNETES:
|
||||
return NewK8sStaticConsistentHash(disCov)
|
||||
case ZOOKEEPER:
|
||||
return NewDefaultAllNode(disCov)
|
||||
default:
|
||||
return newEmptyOnlinePUsher()
|
||||
}
|
||||
}
|
||||
|
||||
type DefaultAllNode struct {
|
||||
disCov discoveryregistry.SvcDiscoveryRegistry
|
||||
}
|
||||
|
||||
func NewDefaultAllNode(disCov discoveryregistry.SvcDiscoveryRegistry) *DefaultAllNode {
|
||||
return &DefaultAllNode{disCov: disCov}
|
||||
}
|
||||
|
||||
func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
conns, err := d.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg = errgroup.Group{}
|
||||
input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}
|
||||
maxWorkers = config.Config.Push.MaxConcurrentWorkers
|
||||
)
|
||||
|
||||
if maxWorkers < 3 {
|
||||
maxWorkers = 3
|
||||
}
|
||||
|
||||
wg.SetLimit(maxWorkers)
|
||||
|
||||
// Online push message
|
||||
for _, conn := range conns {
|
||||
conn := conn // loop var safe
|
||||
wg.Go(func() error {
|
||||
msgClient := msggateway.NewMsgGatewayClient(conn)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "push result", "reply", reply)
|
||||
if reply != nil && reply.SinglePushResult != nil {
|
||||
mu.Lock()
|
||||
wsResults = append(wsResults, reply.SinglePushResult...)
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
_ = wg.Wait()
|
||||
|
||||
// always return nil
|
||||
return wsResults, nil
|
||||
}
|
||||
|
||||
func (d *DefaultAllNode) GetOnlinePushFailedUserIDs(_ context.Context, msg *sdkws.MsgData,
|
||||
wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
|
||||
|
||||
onlineSuccessUserIDs := []string{msg.SendID}
|
||||
for _, v := range wsResults {
|
||||
//message sender do not need offline push
|
||||
if msg.SendID == v.UserID {
|
||||
continue
|
||||
}
|
||||
// mobile online push success
|
||||
if v.OnlinePush {
|
||||
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return utils.SliceSub(*pushToUserIDs, onlineSuccessUserIDs)
|
||||
}
|
||||
|
||||
type K8sStaticConsistentHash struct {
|
||||
disCov discoveryregistry.SvcDiscoveryRegistry
|
||||
}
|
||||
|
||||
func NewK8sStaticConsistentHash(disCov discoveryregistry.SvcDiscoveryRegistry) *K8sStaticConsistentHash {
|
||||
return &K8sStaticConsistentHash{disCov: disCov}
|
||||
}
|
||||
|
||||
func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
|
||||
pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
|
||||
var usersHost = make(map[string][]string)
|
||||
for _, v := range pushToUserIDs {
|
||||
tHost, err := k.disCov.GetUserIdHashGatewayHost(ctx, v)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get msg gateway hash error", err)
|
||||
return nil, err
|
||||
}
|
||||
tUsers, tbl := usersHost[tHost]
|
||||
if tbl {
|
||||
tUsers = append(tUsers, v)
|
||||
usersHost[tHost] = tUsers
|
||||
} else {
|
||||
usersHost[tHost] = []string{v}
|
||||
}
|
||||
}
|
||||
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
|
||||
var usersConns = make(map[*grpc.ClientConn][]string)
|
||||
for host, userIds := range usersHost {
|
||||
tconn, _ := k.disCov.GetConn(ctx, host)
|
||||
usersConns[tconn] = userIds
|
||||
}
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg = errgroup.Group{}
|
||||
maxWorkers = config.Config.Push.MaxConcurrentWorkers
|
||||
)
|
||||
if maxWorkers < 3 {
|
||||
maxWorkers = 3
|
||||
}
|
||||
wg.SetLimit(maxWorkers)
|
||||
for conn, userIds := range usersConns {
|
||||
tcon := conn
|
||||
tuserIds := userIds
|
||||
wg.Go(func() error {
|
||||
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
|
||||
msgClient := msggateway.NewMsgGatewayClient(tcon)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
log.ZDebug(ctx, "push result", "reply", reply)
|
||||
if reply != nil && reply.SinglePushResult != nil {
|
||||
mu.Lock()
|
||||
wsResults = append(wsResults, reply.SinglePushResult...)
|
||||
mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
_ = wg.Wait()
|
||||
return wsResults, nil
|
||||
}
|
||||
func (k *K8sStaticConsistentHash) GetOnlinePushFailedUserIDs(_ context.Context, _ *sdkws.MsgData,
|
||||
wsResults []*msggateway.SingleMsgToUserResults, _ *[]string) []string {
|
||||
var needOfflinePushUserIDs []string
|
||||
for _, v := range wsResults {
|
||||
if !v.OnlinePush {
|
||||
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
|
||||
}
|
||||
}
|
||||
return needOfflinePushUserIDs
|
||||
}
|
51
internal/push/push.go
Normal file
51
internal/push/push.go
Normal file
@ -0,0 +1,51 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
pbpush "github.com/OpenIMSDK/protocol/push"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type pushServer struct {
|
||||
database controller.PushDatabase
|
||||
disCov discoveryregistry.SvcDiscoveryRegistry
|
||||
offlinePusher offlinepush.OfflinePusher
|
||||
pushCh *ConsumerHandler
|
||||
}
|
||||
|
||||
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
|
||||
//todo reserved Interface
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p pushServer) DelUserPushToken(ctx context.Context,
|
||||
req *pbpush.DelUserPushTokenReq) (resp *pbpush.DelUserPushTokenResp, err error) {
|
||||
if err = p.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbpush.DelUserPushTokenResp{}, nil
|
||||
}
|
||||
|
||||
func Start(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewMsgCacheModel(rdb)
|
||||
offlinePusher := offlinepush.NewOfflinePusher(cacheModel)
|
||||
database := controller.NewPushDatabase(cacheModel)
|
||||
|
||||
consumer := NewConsumerHandler(offlinePusher, rdb, disCov)
|
||||
pbpush.RegisterPushMsgServiceServer(server, &pushServer{
|
||||
database: database,
|
||||
disCov: disCov,
|
||||
offlinePusher: offlinePusher,
|
||||
pushCh: consumer,
|
||||
})
|
||||
go consumer.pushConsumerGroup.RegisterHandleAndConsumer(consumer)
|
||||
return nil
|
||||
}
|
@ -16,6 +16,16 @@ package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -31,18 +41,31 @@ import (
|
||||
)
|
||||
|
||||
type ConsumerHandler struct {
|
||||
pushConsumerGroup *kfk.MConsumerGroup
|
||||
pusher *Pusher
|
||||
pushConsumerGroup *kfk.MConsumerGroup
|
||||
offlinePusher offlinepush.OfflinePusher
|
||||
onlinePusher OnlinePusher
|
||||
groupLocalCache *rpccache.GroupLocalCache
|
||||
conversationLocalCache *rpccache.ConversationLocalCache
|
||||
msgRpcClient rpcclient.MessageRpcClient
|
||||
conversationRpcClient rpcclient.ConversationRpcClient
|
||||
groupRpcClient rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
|
||||
func NewConsumerHandler(offlinePusher offlinepush.OfflinePusher,
|
||||
rdb redis.UniversalClient, disCov discoveryregistry.SvcDiscoveryRegistry) *ConsumerHandler {
|
||||
var consumerHandler ConsumerHandler
|
||||
consumerHandler.pusher = pusher
|
||||
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
|
||||
KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
|
||||
}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
|
||||
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
||||
consumerHandler.offlinePusher = offlinePusher
|
||||
consumerHandler.onlinePusher = NewOnlinePusher(disCov)
|
||||
consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(disCov)
|
||||
consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, rdb)
|
||||
consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(disCov)
|
||||
consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(disCov)
|
||||
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, rdb)
|
||||
return &consumerHandler
|
||||
}
|
||||
|
||||
@ -65,7 +88,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
||||
var err error
|
||||
switch msgFromMQ.MsgData.SessionType {
|
||||
case constant.SuperGroupChatType:
|
||||
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||
err = c.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||
default:
|
||||
var pushUserIDList []string
|
||||
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
||||
@ -74,18 +97,14 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
||||
} else {
|
||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
|
||||
}
|
||||
err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData)
|
||||
err = c.Push2User(ctx, pushUserIDList, pbData.MsgData)
|
||||
}
|
||||
if err != nil {
|
||||
if err == errNoOfflinePusher {
|
||||
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
|
||||
} else {
|
||||
log.ZError(ctx, "push failed", err, "msg", pbData.String())
|
||||
}
|
||||
log.ZError(ctx, "push failed", err, "msg", pbData.String())
|
||||
}
|
||||
}
|
||||
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (*ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (*ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim,
|
||||
) error {
|
||||
@ -96,3 +115,243 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
|
||||
func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
|
||||
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
||||
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
|
||||
|
||||
if !c.shouldPushOffline(ctx, msg) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, v := range wsResults {
|
||||
//message sender do not need offline push
|
||||
if msg.SendID == v.UserID {
|
||||
continue
|
||||
}
|
||||
//receiver online push success
|
||||
if v.OnlinePush {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
offlinePUshUserID := []string{msg.RecvID}
|
||||
//receiver offline push
|
||||
if err = callbackOfflinePush(ctx, offlinePUshUserID, msg, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.offlinePushMsg(ctx, msg, offlinePUshUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
||||
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
||||
var pushToUserIDs []string
|
||||
if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
|
||||
|
||||
if !c.shouldPushOffline(ctx, msg) {
|
||||
return nil
|
||||
}
|
||||
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
|
||||
|
||||
//filter some user, like don not disturb or don't need offline push etc.
|
||||
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Use offline push messaging
|
||||
if len(needOfflinePushUserIDs) > 0 {
|
||||
var offlinePushUserIDs []string
|
||||
err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(offlinePushUserIDs) > 0 {
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
|
||||
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
|
||||
title, content, opts, err := c.getOfflinePushInfos(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
||||
if err != nil {
|
||||
prommetrics.MsgOfflinePushFailedCounter.Inc()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, groupID string, msg *sdkws.MsgData,
|
||||
offlinePushUserIDs []string) (userIDs []string, err error) {
|
||||
|
||||
//todo local cache Obtain the difference set through local comparison.
|
||||
needOfflinePushUserIDs, err := c.conversationRpcClient.GetConversationOfflinePushUserIDs(
|
||||
ctx, utils.GenGroupConversationID(groupID), offlinePushUserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return needOfflinePushUserIDs, nil
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
|
||||
type AtTextElem struct {
|
||||
Text string `json:"text,omitempty"`
|
||||
AtUserList []string `json:"atUserList,omitempty"`
|
||||
IsAtSelf bool `json:"isAtSelf"`
|
||||
}
|
||||
|
||||
opts = &offlinepush.Opts{Signal: &offlinepush.Signal{}}
|
||||
if msg.OfflinePushInfo != nil {
|
||||
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
|
||||
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
|
||||
opts.Ex = msg.OfflinePushInfo.Ex
|
||||
}
|
||||
|
||||
if msg.OfflinePushInfo != nil {
|
||||
title = msg.OfflinePushInfo.Title
|
||||
content = msg.OfflinePushInfo.Desc
|
||||
}
|
||||
if title == "" {
|
||||
switch msg.ContentType {
|
||||
case constant.Text:
|
||||
fallthrough
|
||||
case constant.Picture:
|
||||
fallthrough
|
||||
case constant.Voice:
|
||||
fallthrough
|
||||
case constant.Video:
|
||||
fallthrough
|
||||
case constant.File:
|
||||
title = constant.ContentType2PushContent[int64(msg.ContentType)]
|
||||
case constant.AtText:
|
||||
ac := AtTextElem{}
|
||||
_ = utils.JsonStringToStruct(string(msg.Content), &ac)
|
||||
case constant.SignalingNotification:
|
||||
title = constant.ContentType2PushContent[constant.SignalMsg]
|
||||
default:
|
||||
title = constant.ContentType2PushContent[constant.Common]
|
||||
}
|
||||
}
|
||||
if content == "" {
|
||||
content = title
|
||||
}
|
||||
return
|
||||
}
|
||||
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
|
||||
if len(*pushToUserIDs) == 0 {
|
||||
*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch msg.ContentType {
|
||||
case constant.MemberQuitNotification:
|
||||
var tips sdkws.MemberQuitTips
|
||||
if unmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, []string{tips.QuitUser.UserID}); err != nil {
|
||||
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userID", tips.QuitUser.UserID)
|
||||
}
|
||||
*pushToUserIDs = append(*pushToUserIDs, tips.QuitUser.UserID)
|
||||
case constant.MemberKickedNotification:
|
||||
var tips sdkws.MemberKickedTips
|
||||
if unmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
|
||||
if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, kickedUsers); err != nil {
|
||||
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", kickedUsers)
|
||||
}
|
||||
|
||||
*pushToUserIDs = append(*pushToUserIDs, kickedUsers...)
|
||||
case constant.GroupDismissedNotification:
|
||||
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
|
||||
var tips sdkws.GroupDismissedTips
|
||||
if unmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
|
||||
if len(config.Config.Manager.UserID) > 0 {
|
||||
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
|
||||
}
|
||||
defer func(groupID string) {
|
||||
if err = c.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
|
||||
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
|
||||
}
|
||||
}(groupID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
|
||||
conversationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
|
||||
maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
|
||||
}
|
||||
|
||||
func unmarshalNotificationElem(bytes []byte, t any) error {
|
||||
var notification sdkws.NotificationElem
|
||||
if err := json.Unmarshal(bytes, ¬ification); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal([]byte(notification.Detail), t)
|
||||
}
|
||||
|
||||
func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgData) bool {
|
||||
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
||||
if !isOfflinePush {
|
||||
return false
|
||||
}
|
||||
if msg.ContentType == constant.SignalingNotification {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -1,109 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
pbpush "github.com/OpenIMSDK/protocol/push"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
)
|
||||
|
||||
type pushServer struct {
|
||||
pusher *Pusher
|
||||
}
|
||||
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cacheModel := cache.NewMsgCacheModel(rdb)
|
||||
offlinePusher := NewOfflinePusher(cacheModel)
|
||||
database := controller.NewPushDatabase(cacheModel)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client)
|
||||
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
|
||||
msgRpcClient := rpcclient.NewMessageRpcClient(client)
|
||||
pusher := NewPusher(
|
||||
client,
|
||||
offlinePusher,
|
||||
database,
|
||||
rpccache.NewGroupLocalCache(groupRpcClient, rdb),
|
||||
rpccache.NewConversationLocalCache(conversationRpcClient, rdb),
|
||||
&conversationRpcClient,
|
||||
&groupRpcClient,
|
||||
&msgRpcClient,
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
pbpush.RegisterPushMsgServiceServer(server, &pushServer{
|
||||
pusher: pusher,
|
||||
})
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
consumer := NewConsumer(pusher)
|
||||
consumer.Start()
|
||||
}()
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (resp *pbpush.PushMsgResp, err error) {
|
||||
switch pbData.MsgData.SessionType {
|
||||
case constant.SuperGroupChatType:
|
||||
err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||
default:
|
||||
var pushUserIDList []string
|
||||
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
||||
if !isSenderSync {
|
||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
||||
} else {
|
||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
|
||||
}
|
||||
err = r.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData)
|
||||
}
|
||||
if err != nil {
|
||||
if err != errNoOfflinePusher {
|
||||
return nil, err
|
||||
} else {
|
||||
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
|
||||
}
|
||||
}
|
||||
return &pbpush.PushMsgResp{}, nil
|
||||
}
|
||||
|
||||
func (r *pushServer) DelUserPushToken(
|
||||
ctx context.Context,
|
||||
req *pbpush.DelUserPushTokenReq,
|
||||
) (resp *pbpush.DelUserPushTokenResp, err error) {
|
||||
if err = r.pusher.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbpush.DelUserPushTokenResp{}, nil
|
||||
}
|
@ -1,511 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
|
||||
"google.golang.org/grpc"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/conversation"
|
||||
"github.com/OpenIMSDK/protocol/msggateway"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/discoveryregistry"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/OpenIMSDK/tools/mcontext"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||
)
|
||||
|
||||
type Pusher struct {
|
||||
database controller.PushDatabase
|
||||
discov discoveryregistry.SvcDiscoveryRegistry
|
||||
offlinePusher offlinepush.OfflinePusher
|
||||
groupLocalCache *rpccache.GroupLocalCache
|
||||
conversationLocalCache *rpccache.ConversationLocalCache
|
||||
msgRpcClient *rpcclient.MessageRpcClient
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
|
||||
|
||||
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
|
||||
groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
|
||||
) *Pusher {
|
||||
return &Pusher{
|
||||
discov: discov,
|
||||
database: database,
|
||||
offlinePusher: offlinePusher,
|
||||
groupLocalCache: groupLocalCache,
|
||||
conversationLocalCache: conversationLocalCache,
|
||||
msgRpcClient: msgRpcClient,
|
||||
conversationRpcClient: conversationRpcClient,
|
||||
groupRpcClient: groupRpcClient,
|
||||
}
|
||||
}
|
||||
|
||||
func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
|
||||
var offlinePusher offlinepush.OfflinePusher
|
||||
switch config.Config.Push.Enable {
|
||||
case "getui":
|
||||
offlinePusher = getui.NewClient(cache)
|
||||
case "fcm":
|
||||
offlinePusher = fcm.NewClient(cache)
|
||||
case "jpush":
|
||||
offlinePusher = jpush.NewClient()
|
||||
default:
|
||||
offlinePusher = dummy.NewClient()
|
||||
}
|
||||
return offlinePusher
|
||||
}
|
||||
|
||||
func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
|
||||
conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
|
||||
maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)
|
||||
}
|
||||
|
||||
func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error {
|
||||
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
||||
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
// push
|
||||
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
||||
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
|
||||
|
||||
if !isOfflinePush {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, v := range wsResults {
|
||||
if !v.OnlinePush && msg.SendID == v.UserID {
|
||||
if err = callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.offlinePushMsg(ctx, msg.SendID, msg, []string{v.UserID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error {
|
||||
var notification sdkws.NotificationElem
|
||||
if err := json.Unmarshal(bytes, ¬ification); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal([]byte(notification.Detail), t)
|
||||
}
|
||||
|
||||
/*
|
||||
k8s deployment,offline push group messages function
|
||||
*/
|
||||
func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error {
|
||||
|
||||
var needOfflinePushUserIDs []string
|
||||
for _, v := range wsResults {
|
||||
if !v.OnlinePush {
|
||||
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
|
||||
}
|
||||
}
|
||||
if len(needOfflinePushUserIDs) > 0 {
|
||||
var offlinePushUserIDs []string
|
||||
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(offlinePushUserIDs) > 0 {
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
if msg.ContentType != constant.SignalingNotification {
|
||||
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
|
||||
ctx,
|
||||
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.UserIDs) > 0 {
|
||||
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
||||
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
||||
var pushToUserIDs []string
|
||||
if err = callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(pushToUserIDs) == 0 {
|
||||
pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch msg.ContentType {
|
||||
case constant.MemberQuitNotification:
|
||||
var tips sdkws.MemberQuitTips
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
defer func(groupID string, userIDs []string) {
|
||||
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
||||
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
||||
}
|
||||
}(groupID, []string{tips.QuitUser.UserID})
|
||||
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
|
||||
case constant.MemberKickedNotification:
|
||||
var tips sdkws.MemberKickedTips
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
|
||||
defer func(groupID string, userIDs []string) {
|
||||
if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
||||
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
||||
}
|
||||
}(groupID, kickedUsers)
|
||||
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
|
||||
case constant.GroupDismissedNotification:
|
||||
if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到
|
||||
var tips sdkws.GroupDismissedTips
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
|
||||
if len(config.Config.Manager.UserID) > 0 {
|
||||
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
|
||||
}
|
||||
defer func(groupID string) {
|
||||
if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil {
|
||||
log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID)
|
||||
}
|
||||
}(groupID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
|
||||
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
||||
if isOfflinePush && config.Config.Envs.Discovery == "k8s" {
|
||||
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
|
||||
}
|
||||
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
|
||||
var (
|
||||
onlineSuccessUserIDs = []string{msg.SendID}
|
||||
webAndPcBackgroundUserIDs []string
|
||||
)
|
||||
|
||||
for _, v := range wsResults {
|
||||
if v.OnlinePush && v.UserID != msg.SendID {
|
||||
onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID)
|
||||
}
|
||||
|
||||
if v.OnlinePush {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(v.Resp) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, singleResult := range v.Resp {
|
||||
if singleResult.ResultCode != -2 {
|
||||
continue
|
||||
}
|
||||
|
||||
isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC
|
||||
isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID
|
||||
|
||||
if isPC || isWebID {
|
||||
webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
|
||||
|
||||
// Use offline push messaging
|
||||
if len(needOfflinePushUserIDs) > 0 {
|
||||
var offlinePushUserIDs []string
|
||||
err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(offlinePushUserIDs) > 0 {
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
if msg.ContentType != constant.SignalingNotification {
|
||||
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
|
||||
ctx,
|
||||
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.UserIDs) > 0 {
|
||||
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
||||
return err
|
||||
}
|
||||
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
var usersHost = make(map[string][]string)
|
||||
for _, v := range pushToUserIDs {
|
||||
tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "get msggateway hash error", err)
|
||||
return nil, err
|
||||
}
|
||||
tUsers, tbl := usersHost[tHost]
|
||||
if tbl {
|
||||
tUsers = append(tUsers, v)
|
||||
usersHost[tHost] = tUsers
|
||||
} else {
|
||||
usersHost[tHost] = []string{v}
|
||||
}
|
||||
}
|
||||
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
|
||||
var usersConns = make(map[*grpc.ClientConn][]string)
|
||||
for host, userIds := range usersHost {
|
||||
tconn, _ := p.discov.GetConn(ctx, host)
|
||||
usersConns[tconn] = userIds
|
||||
}
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg = errgroup.Group{}
|
||||
maxWorkers = config.Config.Push.MaxConcurrentWorkers
|
||||
)
|
||||
if maxWorkers < 3 {
|
||||
maxWorkers = 3
|
||||
}
|
||||
wg.SetLimit(maxWorkers)
|
||||
for conn, userIds := range usersConns {
|
||||
tcon := conn
|
||||
tuserIds := userIds
|
||||
wg.Go(func() error {
|
||||
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
|
||||
msgClient := msggateway.NewMsgGatewayClient(tcon)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
log.ZDebug(ctx, "push result", "reply", reply)
|
||||
if reply != nil && reply.SinglePushResult != nil {
|
||||
mu.Lock()
|
||||
wsResults = append(wsResults, reply.SinglePushResult...)
|
||||
mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
_ = wg.Wait()
|
||||
return wsResults, nil
|
||||
}
|
||||
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
if config.Config.Envs.Discovery == "k8s" {
|
||||
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
|
||||
}
|
||||
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg = errgroup.Group{}
|
||||
input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}
|
||||
maxWorkers = config.Config.Push.MaxConcurrentWorkers
|
||||
)
|
||||
|
||||
if maxWorkers < 3 {
|
||||
maxWorkers = 3
|
||||
}
|
||||
|
||||
wg.SetLimit(maxWorkers)
|
||||
|
||||
// Online push message
|
||||
for _, conn := range conns {
|
||||
conn := conn // loop var safe
|
||||
wg.Go(func() error {
|
||||
msgClient := msggateway.NewMsgGatewayClient(conn)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.ZDebug(ctx, "push result", "reply", reply)
|
||||
if reply != nil && reply.SinglePushResult != nil {
|
||||
mu.Lock()
|
||||
wsResults = append(wsResults, reply.SinglePushResult...)
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
_ = wg.Wait()
|
||||
|
||||
// always return nil
|
||||
return wsResults, nil
|
||||
}
|
||||
|
||||
func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
|
||||
title, content, opts, err := p.getOfflinePushInfos(conversationID, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
|
||||
if err != nil {
|
||||
prommetrics.MsgOfflinePushFailedCounter.Inc()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts, err error) {
|
||||
opts = &offlinepush.Opts{Signal: &offlinepush.Signal{}}
|
||||
// if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd {
|
||||
// req := &sdkws.SignalReq{}
|
||||
// if err := proto.Unmarshal(msg.Content, req); err != nil {
|
||||
// return nil, utils.Wrap(err, "")
|
||||
// }
|
||||
// switch req.Payload.(type) {
|
||||
// case *sdkws.SignalReq_Invite, *sdkws.SignalReq_InviteInGroup:
|
||||
// opts.Signal = &offlinepush.Signal{ClientMsgID: msg.ClientMsgID}
|
||||
// }
|
||||
// }
|
||||
if msg.OfflinePushInfo != nil {
|
||||
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
|
||||
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
|
||||
opts.Ex = msg.OfflinePushInfo.Ex
|
||||
}
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
|
||||
if p.offlinePusher == nil {
|
||||
err = errNoOfflinePusher
|
||||
return
|
||||
}
|
||||
|
||||
type atContent struct {
|
||||
Text string `json:"text"`
|
||||
AtUserList []string `json:"atUserList"`
|
||||
IsAtSelf bool `json:"isAtSelf"`
|
||||
}
|
||||
|
||||
opts, err = p.GetOfflinePushOpts(msg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if msg.OfflinePushInfo != nil {
|
||||
title = msg.OfflinePushInfo.Title
|
||||
content = msg.OfflinePushInfo.Desc
|
||||
}
|
||||
if title == "" {
|
||||
switch msg.ContentType {
|
||||
case constant.Text:
|
||||
fallthrough
|
||||
case constant.Picture:
|
||||
fallthrough
|
||||
case constant.Voice:
|
||||
fallthrough
|
||||
case constant.Video:
|
||||
fallthrough
|
||||
case constant.File:
|
||||
title = constant.ContentType2PushContent[int64(msg.ContentType)]
|
||||
case constant.AtText:
|
||||
ac := atContent{}
|
||||
_ = utils.JsonStringToStruct(string(msg.Content), &ac)
|
||||
if utils.IsContain(conversationID, ac.AtUserList) {
|
||||
title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
|
||||
} else {
|
||||
title = constant.ContentType2PushContent[constant.GroupMsg]
|
||||
}
|
||||
case constant.SignalingNotification:
|
||||
title = constant.ContentType2PushContent[constant.SignalMsg]
|
||||
default:
|
||||
title = constant.ContentType2PushContent[constant.Common]
|
||||
}
|
||||
}
|
||||
if content == "" {
|
||||
content = title
|
||||
}
|
||||
return
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func GetContent(msg *sdkws.MsgData) string {
|
||||
if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd {
|
||||
var tips sdkws.TipsComm
|
||||
_ = proto.Unmarshal(msg.Content, &tips)
|
||||
content := tips.JsonDetail
|
||||
return content
|
||||
} else {
|
||||
return string(msg.Content)
|
||||
}
|
||||
}
|
@ -114,6 +114,14 @@ func (c *ConversationRpcClient) GetConversationsByConversationID(ctx context.Con
|
||||
return resp.Conversations, nil
|
||||
}
|
||||
|
||||
func (c *ConversationRpcClient) GetConversationOfflinePushUserIDs(ctx context.Context, conversationID string, userIDs []string) ([]string, error) {
|
||||
resp, err := c.Client.GetConversationOfflinePushUserIDs(ctx, &pbconversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationID, UserIDs: userIDs})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp.UserIDs, nil
|
||||
}
|
||||
|
||||
func (c *ConversationRpcClient) GetConversations(
|
||||
ctx context.Context,
|
||||
ownerUserID string,
|
||||
|
Loading…
x
Reference in New Issue
Block a user