diff --git a/cmd/push/main.go b/cmd/push/main.go index 9658d4249..66cbd98c5 100644 --- a/cmd/push/main.go +++ b/cmd/push/main.go @@ -19,7 +19,8 @@ func main() { wg.Add(1) log.NewPrivateLog(constant.LogFileName) fmt.Println("start push rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - push.Init(*rpcPort) - push.Run(*prometheusPort) + pusher := push.Push{} + pusher.Init(*rpcPort) + pusher.Run(*prometheusPort) wg.Wait() } diff --git a/config/config.yaml b/config/config.yaml index 771ea9666..7f3f48e27 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -150,7 +150,7 @@ rpcport: #rpc服务端口 默认即可 openImAuthPort: [ 10160 ] openImPushPort: [ 10170 ] openImConversationPort: [ 10230 ] - openImRealTimeCommPort: [ 11300 ] + openImRtcPort: [ 11300 ] rpcregistername: #rpc注册服务名,默认即可 @@ -158,11 +158,11 @@ rpcregistername: #rpc注册服务名,默认即可 openImFriendName: Friend openImMsgName: Msg openImPushName: Push - openImRelayName: Relay + openImMessageGatewayName: MessageGateway openImGroupName: Group openImAuthName: Auth openImConversationName: Conversation - openImRealTimeCommName: RealTimeComm + openImRtcName: Rtc log: storageLocation: ../logs/ @@ -326,5 +326,5 @@ prometheus: authPrometheusPort: [ 20160 ] pushPrometheusPort: [ 20170 ] conversationPrometheusPort: [ 20230 ] - realTimeCommPrometheusPort: [ 21300 ] + RtcPrometheusPort: [ 21300 ] messageTransferPrometheusPort: [ 21400, 21401, 21402, 21403 ] # 端口数量和 script/path_info.cfg msg_transfer_service_num保持一致 diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 5f2765fe8..b345ccaca 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -36,11 +36,11 @@ func Init(rpcPort, wsPort int) { initPrometheus() } -func Run(promethuesPort int) { +func Run(prometheusPort int) { go ws.run() go rpcSvr.run() go func() { - err := prome.StartPromeSrv(promethuesPort) + err := prome.StartPromeSrv(prometheusPort) if err != nil { panic(err) } diff --git a/internal/msggateway/logic.go b/internal/msggateway/logic.go index 5752ce74f..54ac0bf7d 100644 --- a/internal/msggateway/logic.go +++ b/internal/msggateway/logic.go @@ -287,7 +287,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID) if isPass { signalResp := pbRtc.SignalResp{} - etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRealTimeCommName, m.OperationID) + etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRtcName, m.OperationID) if etcdConn == nil { errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil" log.NewError(m.OperationID, errMsg) @@ -301,7 +301,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { } respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req) if err != nil { - log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.RpcRegisterName.OpenImRealTimeCommName) + log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.RpcRegisterName.OpenImRtcName) ws.sendSignalMsgResp(conn, 204, "grpc SignalMessageAssemble failed: "+err.Error(), m, &signalResp) return } diff --git a/internal/push/callback.go b/internal/push/callback.go index d0777b2fa..d293b95df 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -6,7 +6,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/http" "Open_IM/pkg/common/tracelog" - common "Open_IM/pkg/proto/sdkws" + "Open_IM/pkg/proto/sdkws" "Open_IM/pkg/utils" "context" ) @@ -15,7 +15,7 @@ func url() string { return config.Config.Callback.CallbackUrl } -func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.MsgData, offlinePushUserIDList *[]string) error { +func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { if !config.Config.Callback.CallbackOfflinePush.Enable { return nil } @@ -27,7 +27,7 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), }, - UserIDList: userIDList, + UserIDList: userIDs, }, OfflinePushInfo: msg.OfflinePushInfo, ClientMsgID: msg.ClientMsgID, @@ -43,8 +43,8 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M if err != nil { return err } - if len(resp.UserIDList) != 0 { - *offlinePushUserIDList = resp.UserIDList + if len(resp.UserIDs) != 0 { + *offlinePushUserIDs = resp.UserIDs } if resp.OfflinePushInfo != nil { msg.OfflinePushInfo = resp.OfflinePushInfo @@ -52,19 +52,19 @@ func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M return nil } -func callbackOnlinePush(operationID string, userIDList []string, msg *common.MsgData) error { - if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDList...) { +func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { + if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) { return nil } req := callbackstruct.CallbackBeforePushReq{ UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ CallbackCommand: constant.CallbackOnlinePushCommand, - OperationID: operationID, + OperationID: tracelog.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), }, - UserIDList: userIDList, + UserIDList: userIDs, }, ClientMsgID: msg.ClientMsgID, SendID: msg.SendID, @@ -78,7 +78,7 @@ func callbackOnlinePush(operationID string, userIDList []string, msg *common.Msg return http.CallBackPostReturn(url(), req, resp, config.Config.Callback.CallbackOnlinePush) } -func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *common.MsgData, pushToUserList *[]string) error { +func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *sdkws.MsgData, pushToUserIDs *[]string) error { if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable { return nil } @@ -89,21 +89,21 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg PlatformID: int(msg.SenderPlatformID), Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)), }, - ClientMsgID: msg.ClientMsgID, - SendID: msg.SendID, - GroupID: groupID, - ContentType: msg.ContentType, - SessionType: msg.SessionType, - AtUserIDList: msg.AtUserIDList, - Content: utils.GetContent(msg), - Seq: msg.Seq, + ClientMsgID: msg.ClientMsgID, + SendID: msg.SendID, + GroupID: groupID, + ContentType: msg.ContentType, + SessionType: msg.SessionType, + AtUserIDs: msg.AtUserIDList, + Content: utils.GetContent(msg), + Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { return err } - if len(resp.UserIDList) != 0 { - *pushToUserList = resp.UserIDList + if len(resp.UserIDs) != 0 { + *pushToUserIDs = resp.UserIDs } return nil } diff --git a/internal/push/fcm/push.go b/internal/push/fcm/push.go index 6042a4bf8..fcab72f29 100644 --- a/internal/push/fcm/push.go +++ b/internal/push/fcm/push.go @@ -22,7 +22,7 @@ type Fcm struct { cache cache.Cache } -func newFcmClient(cache cache.Cache) *Fcm { +func NewClient(cache cache.Cache) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { @@ -42,7 +42,7 @@ func newFcmClient(cache cache.Cache) *Fcm { return &Fcm{fcmMsgCli: fcmMsgClient} } -func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts push.Opts) error { +func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error { // accounts->registrationToken allTokens := make(map[string][]string, 0) for _, account := range userIDs { @@ -105,7 +105,6 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, } messages = append(messages, temp) } - } messageCount := len(messages) if messageCount > 0 { diff --git a/internal/push/fcm/push_test.go b/internal/push/fcm/push_test.go index 5e31058fd..4be39b685 100644 --- a/internal/push/fcm/push_test.go +++ b/internal/push/fcm/push_test.go @@ -2,14 +2,15 @@ package fcm import ( "Open_IM/internal/push" - "fmt" + "Open_IM/pkg/common/db/cache" + "context" "github.com/stretchr/testify/assert" "testing" ) func Test_Push(t *testing.T) { - offlinePusher := NewFcm() - resp, err := offlinePusher.Push([]string{"test_uid"}, "test", "test", "12321", push.PushOpts{}) + var redis cache.Cache + offlinePusher := NewClient(redis) + err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &push.Opts{}) assert.Nil(t, err) - fmt.Println(resp) } diff --git a/internal/push/getui/body.go b/internal/push/getui/body.go index 6c3628d1d..713b01cd0 100644 --- a/internal/push/getui/body.go +++ b/internal/push/getui/body.go @@ -1,13 +1,32 @@ package getui -import "Open_IM/pkg/common/config" +import ( + "Open_IM/pkg/common/config" + "fmt" +) -type CommonResp struct { +type Resp struct { Code int `json:"code"` Msg string `json:"msg"` Data interface{} `json:"data"` } +func (r *Resp) parseError() (err error) { + switch r.Code { + case tokenExpireCode: + err = TokenExpireError + case 0: + err = nil + default: + err = fmt.Errorf("code %d, msg %s", r.Code, r.Msg) + } + return err +} + +type RespI interface { + parseError() error +} + type AuthReq struct { Sign string `json:"sign"` Timestamp string `json:"timestamp"` @@ -95,6 +114,10 @@ type Options struct { } `json:"VV"` } +type Payload struct { + IsSignal bool `json:"isSignal"` +} + func newPushReq(title, content string) PushReq { pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{ Title: title, @@ -106,6 +129,11 @@ func newPushReq(title, content string) PushReq { return pushReq } +func newBatchPushReq(userIDs []string, taskID string) PushReq { + var IsAsync = true + return PushReq{Audience: &Audience{Alias: userIDs}, IsAsync: &IsAsync, TaskID: &taskID} +} + func (pushReq *PushReq) setPushChannel(title string, body string) { pushReq.PushChannel = &PushChannel{} // autoBadge := "+1" diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 525e335dd..7d3ffe2a0 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -4,23 +4,23 @@ import ( "Open_IM/internal/push" "Open_IM/pkg/common/config" "Open_IM/pkg/common/db/cache" - //http2 "Open_IM/pkg/common/http" - "Open_IM/pkg/common/log" + http2 "Open_IM/pkg/common/http" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils/splitter" + "github.com/go-redis/redis/v8" + "Open_IM/pkg/utils" - "bytes" "context" "crypto/sha256" "encoding/hex" - "encoding/json" "errors" - "io/ioutil" - "net/http" "strconv" "time" ) var ( TokenExpireError = errors.New("token expire") + UserIDEmptyError = errors.New("userIDs is empty") ) const ( @@ -29,62 +29,56 @@ const ( taskURL = "/push/list/message" batchPushURL = "/push/list/alias" - tokenExpire = 10001 - ttl = 0 + // codes + tokenExpireCode = 10001 + tokenExpireTime = 60 * 60 * 23 + taskIDTTL = 1000 * 60 * 60 * 24 ) type Client struct { - cache cache.Cache + cache cache.Cache + tokenExpireTime int64 + taskIDTTL int64 } -func newClient(cache cache.Cache) *Client { - return &Client{cache: cache} +func NewClient(cache cache.Cache) *Client { + return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} } -func (g *Client) Push(ctx context.Context, userIDs []string, title, content, operationID string, opts *push.Opts) error { +func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error { token, err := g.cache.GetGetuiToken(ctx) if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "GetGetuiToken failed", err.Error()) - } - if token == "" || err != nil { - token, err = g.getTokenAndSave2Redis(ctx) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed", err.Error()) - return utils.Wrap(err, "") + if err == redis.Nil { + token, err = g.getTokenAndSave2Redis(ctx) + if err != nil { + return err + } + } else { + return err } } pushReq := newPushReq(title, content) pushReq.setPushChannel(title, content) - pushResp := struct{}{} if len(userIDs) > 1 { - taskID, err := g.GetTaskID(ctx, token, pushReq) - if err != nil { - return utils.Wrap(err, "GetTaskIDAndSave2Redis failed") + maxNum := 999 + if len(userIDs) > maxNum { + s := splitter.NewSplitter(maxNum, userIDs) + for _, v := range s.GetSplitResult() { + err = g.batchPush(ctx, token, v.Item, pushReq) + } + } else { + err = g.batchPush(ctx, token, userIDs, pushReq) } - pushReq = PushReq{Audience: &Audience{Alias: userIDs}} - var IsAsync = true - pushReq.IsAsync = &IsAsync - pushReq.TaskID = &taskID - err = g.request(ctx, batchPushURL, pushReq, token, &pushResp) + } else if len(userIDs) == 1 { + err = g.singlePush(ctx, token, userIDs[0], pushReq) } else { - reqID := utils.OperationIDGenerator() - pushReq.RequestID = &reqID - pushReq.Audience = &Audience{Alias: []string{userIDs[0]}} - err = g.request(ctx, pushURL, pushReq, token, &pushResp) + return UserIDEmptyError } switch err { case TokenExpireError: token, err = g.getTokenAndSave2Redis(ctx) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed, ", err.Error()) - } else { - log.NewInfo(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis: ", token) - } } - if err != nil { - return utils.Wrap(err, "push failed") - } - return utils.Wrap(err, "") + return err } func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) { @@ -101,7 +95,6 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir if err != nil { return "", 0, err } - //log.NewInfo(operationID, utils.GetSelfFuncName(), "result: ", respAuth) expire, err := strconv.Atoi(respAuth.ExpireTime) return respAuth.Token, int64(expire), err } @@ -117,63 +110,59 @@ func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) ( return respTask.TaskID, nil } -func (g *Client) request(ctx context.Context, url string, content interface{}, token string, output interface{}) error { - con, err := json.Marshal(content) +// max num is 999 +func (g *Client) batchPush(ctx context.Context, token string, userIDs []string, pushReq PushReq) error { + taskID, err := g.GetTaskID(ctx, token, pushReq) if err != nil { return err } - client := &http.Client{} - req, err := http.NewRequest("POST", config.Config.Push.Getui.PushUrl+url, bytes.NewBuffer(con)) + pushReq = newBatchPushReq(userIDs, taskID) + return g.request(ctx, batchPushURL, pushReq, token, nil) +} + +func (g *Client) singlePush(ctx context.Context, token, userID string, pushReq PushReq) error { + operationID := tracelog.GetOperationID(ctx) + pushReq.RequestID = &operationID + pushReq.Audience = &Audience{Alias: []string{userID}} + return g.request(ctx, pushURL, pushReq, token, nil) +} + +func (g *Client) request(ctx context.Context, url string, input interface{}, token string, output interface{}) error { + header := map[string]string{"token": token} + resp := &Resp{} + resp.Data = output + return g.postReturn(config.Config.Push.Getui.PushUrl+url, header, input, resp, 3) +} + +func (g *Client) postReturn(url string, header map[string]string, input interface{}, output RespI, timeout int) error { + err := http2.PostReturn(url, header, input, output, timeout) if err != nil { return err } - if token != "" { - req.Header.Set("token", token) - } - req.Header.Set("content-type", "application/json") - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - result, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - //log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result)) - commonResp := CommonResp{} - commonResp.Data = output - if err := json.Unmarshal(result, &commonResp); err != nil { - return err - } - if commonResp.Code == tokenExpire { - return TokenExpireError - } - return nil + return output.parseError() } func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err error) { token, _, err = g.Auth(ctx, time.Now().UnixNano()/1e6) if err != nil { - return "", utils.Wrap(err, "Auth failed") + return } - err = g.cache.SetGetuiTaskID(ctx, token, 60*60*23) + err = g.cache.SetGetuiToken(ctx, token, 60*60*23) if err != nil { - return "", utils.Wrap(err, "Auth failed") + return } return token, nil } func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) { - ttl := int64(1000 * 60 * 60 * 24) - pushReq.Settings = &Settings{TTL: &ttl} + pushReq.Settings = &Settings{TTL: &g.taskIDTTL} taskID, err = g.GetTaskID(ctx, token, pushReq) if err != nil { - return "", utils.Wrap(err, "GetTaskIDAndSave2Redis failed") + return } - err = g.cache.SetGetuiTaskID(ctx, taskID, 60*60*23) + err = g.cache.SetGetuiTaskID(ctx, taskID, g.tokenExpireTime) if err != nil { - return "", utils.Wrap(err, "Auth failed") + return } return token, nil } diff --git a/internal/push/init.go b/internal/push/init.go index 5482c65d4..51fcce428 100644 --- a/internal/push/init.go +++ b/internal/push/init.go @@ -12,47 +12,46 @@ import ( jpush "Open_IM/internal/push/jpush" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/prome" "Open_IM/pkg/statistics" "fmt" ) -var ( +type Push struct { rpcServer RPCServer pushCh ConsumerHandler offlinePusher OfflinePusher successCount uint64 -) - -func Init(rpcPort int) { - rpcServer.Init(rpcPort) - pushCh.Init() - } -func init() { - statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + +func (p *Push) Init(rpcPort int) { + var cacheInterface cache.Cache + + p.rpcServer.Init(rpcPort, cacheInterface) + p.pushCh.Init() + statistics.NewStatistics(&p.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) if *config.Config.Push.Getui.Enable { - offlinePusher = getui.GetuiClient + p.offlinePusher = getui.NewClient(cacheInterface) } if config.Config.Push.Jpns.Enable { - offlinePusher = jpush.JPushClient + p.offlinePusher = jpush.NewClient() } - if config.Config.Push.Fcm.Enable { - offlinePusher = fcm.NewFcm() + p.offlinePusher = fcm.NewClient(cacheInterface) } } -func initPrometheus() { +func (p *Push) initPrometheus() { prome.NewMsgOfflinePushSuccessCounter() prome.NewMsgOfflinePushFailedCounter() } -func Run(promethuesPort int) { - go rpcServer.run() - go pushCh.ConsumerGroup.RegisterHandleAndConsumer(&pushCh) +func (p *Push) Run(prometheusPort int) { + go p.rpcServer.run() + go p.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&p.pushCh) go func() { - err := prome.StartPromeSrv(promethuesPort) + err := prome.StartPromeSrv(prometheusPort) if err != nil { panic(err) } diff --git a/internal/push/jpush/body/audience.go b/internal/push/jpush/body/audience.go index b523693f9..124c1072a 100644 --- a/internal/push/jpush/body/audience.go +++ b/internal/push/jpush/body/audience.go @@ -1,13 +1,11 @@ package body const ( - TAG = "tag" - TAG_AND = "tag_and" - TAG_NOT = "tag_not" - ALIAS = "alias" - REGISTRATION_ID = "registration_id" - SEGMENT = "segment" - ABTEST = "abtest" + TAG = "tag" + TAGAND = "tag_and" + TAGNOT = "tag_not" + ALIAS = "alias" + REGISTRATIONID = "registration_id" ) type Audience struct { @@ -32,11 +30,11 @@ func (a *Audience) SetTag(tags []string) { } func (a *Audience) SetTagAnd(tags []string) { - a.set(TAG_AND, tags) + a.set(TAGAND, tags) } func (a *Audience) SetTagNot(tags []string) { - a.set(TAG_NOT, tags) + a.set(TAGNOT, tags) } func (a *Audience) SetAlias(alias []string) { @@ -44,7 +42,7 @@ func (a *Audience) SetAlias(alias []string) { } func (a *Audience) SetRegistrationId(ids []string) { - a.set(REGISTRATION_ID, ids) + a.set(REGISTRATIONID, ids) } func (a *Audience) SetAll() { diff --git a/internal/push/jpush/push.go b/internal/push/jpush/push.go index 4369b90af..2d4ecb9f2 100644 --- a/internal/push/jpush/push.go +++ b/internal/push/jpush/push.go @@ -3,27 +3,16 @@ package push import ( "Open_IM/internal/push" "Open_IM/internal/push/jpush/body" - "Open_IM/internal/push/jpush/common" "Open_IM/pkg/common/config" - "bytes" + http2 "Open_IM/pkg/common/http" + "context" "encoding/base64" - "encoding/json" "fmt" - "io/ioutil" - "net/http" ) -var ( - JPushClient *JPush -) - -func init() { - JPushClient = newJPushClient() -} - type JPush struct{} -func newJPushClient() *JPush { +func NewClient() *JPush { return &JPush{} } @@ -35,18 +24,18 @@ func (j *JPush) SetAlias(cid, alias string) (resp string, err error) { return resp, nil } -func (j *JPush) getAuthorization(Appkey string, MasterSecret string) string { - str := fmt.Sprintf("%s:%s", Appkey, MasterSecret) +func (j *JPush) getAuthorization(appKey string, masterSecret string) string { + str := fmt.Sprintf("%s:%s", appKey, masterSecret) buf := []byte(str) Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(buf)) return Authorization } -func (j *JPush) Push(accounts []string, title, detailContent, operationID string, opts push.PushOpts) (string, error) { +func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *push.Opts) error { var pf body.Platform pf.SetAll() var au body.Audience - au.SetAlias(accounts) + au.SetAlias(userIDs) var no body.Notification var extras body.Extras if opts.Signal.ClientMsgID != "" { @@ -55,35 +44,20 @@ func (j *JPush) Push(accounts []string, title, detailContent, operationID string no.IOSEnableMutableContent() no.SetExtras(extras) no.SetAlert(title) - var me body.Message - me.SetMsgContent(detailContent) - var o body.Options - o.SetApnsProduction(config.Config.IOSPush.Production) - var po body.PushObj - po.SetPlatform(&pf) - po.SetAudience(&au) - po.SetNotification(&no) - po.SetMessage(&me) - po.SetOptions(&o) - - con, err := json.Marshal(po) - if err != nil { - return "", err - } - client := &http.Client{} - req, err := http.NewRequest("POST", config.Config.Push.Jpns.PushUrl, bytes.NewBuffer(con)) - if err != nil { - return "", err - } - req.Header.Set("Authorization", j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)) - resp, err := client.Do(req) - if err != nil { - return "", err - } - defer resp.Body.Close() - result, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - return string(result), nil + var msg body.Message + msg.SetMsgContent(content) + var opt body.Options + opt.SetApnsProduction(config.Config.IOSPush.Production) + var pushObj body.PushObj + pushObj.SetPlatform(&pf) + pushObj.SetAudience(&au) + pushObj.SetNotification(&no) + pushObj.SetMessage(&msg) + pushObj.SetOptions(&opt) + var resp interface{} + return j.request(pushObj, resp, 5) +} + +func (j *JPush) request(po body.PushObj, resp interface{}, timeout int) error { + return http2.PostReturn(config.Config.Push.Jpns.PushUrl, map[string]string{"Authorization": j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)}, po, resp, timeout) } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index b9bca2888..13dad5704 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -35,9 +35,8 @@ func (c *ConsumerHandler) handleMs2PsChat(msg []byte) { return } pbData := &pbPush.PushMsgReq{ - OperationID: msgFromMQ.OperationID, - MsgData: msgFromMQ.MsgData, - PushToUserID: msgFromMQ.PushToUserID, + MsgData: msgFromMQ.MsgData, + SourceID: msgFromMQ.PushToUserID, } sec := msgFromMQ.MsgData.SendTime / 1000 nowSec := utils.GetCurrentTimestampBySecond() diff --git a/internal/push/push_interface.go b/internal/push/push_interface.go index 7ed3c4b85..7b69fb689 100644 --- a/internal/push/push_interface.go +++ b/internal/push/push_interface.go @@ -3,7 +3,7 @@ package push import "context" type OfflinePusher interface { - Push(ctx context.Context, userIDs []string, title, content, opts *Opts) error + Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error } type Opts struct { diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6a9487c9e..6efcc62dc 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -3,9 +3,10 @@ package push import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/cache" + "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/log" - prome "Open_IM/pkg/common/prometheus" + "Open_IM/pkg/common/prome" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" "context" @@ -22,14 +23,16 @@ type RPCServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + push controller.PushInterface } -func (r *RPCServer) Init(rpcPort int) { +func (r *RPCServer) Init(rpcPort int, cache cache.Cache) { r.rpcPort = rpcPort r.rpcRegisterName = config.Config.RpcRegisterName.OpenImPushName r.etcdSchema = config.Config.Etcd.EtcdSchema r.etcdAddr = config.Config.Etcd.EtcdAddr } + func (r *RPCServer) run() { listenIP := "" if config.Config.ListenIP == "" { @@ -77,29 +80,17 @@ func (r *RPCServer) run() { return } } -func (r *RPCServer) PushMsg(_ context.Context, pbData *pbPush.PushMsgReq) (*pbPush.PushMsgResp, error) { - //Call push module to send message to the user + +func (r *RPCServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (resp *pbPush.PushMsgResp, err error) { switch pbData.MsgData.SessionType { case constant.SuperGroupChatType: MsgToSuperGroupUser(pbData) default: MsgToUser(pbData) } - return &pbPush.PushMsgResp{ - ResultCode: 0, - }, nil - + return &pbPush.PushMsgResp{}, nil } -func (r *RPCServer) DelUserPushToken(c context.Context, req *pbPush.DelUserPushTokenReq) (*pbPush.DelUserPushTokenResp, error) { - log.Debug(req.OperationID, utils.GetSelfFuncName(), "req", req.String()) - var resp pbPush.DelUserPushTokenResp - err := db.DB.DelFcmToken(req.UserID, int(req.PlatformID)) - if err != nil { - errMsg := req.OperationID + " " + "DelFcmToken failed " + err.Error() - log.NewError(req.OperationID, errMsg) - resp.ErrCode = 500 - resp.ErrMsg = errMsg - } - return &resp, nil +func (r *RPCServer) DelUserPushToken(ctx context.Context, req *pbPush.DelUserPushTokenReq) (resp *pbPush.DelUserPushTokenResp, err error) { + return &pbPush.DelUserPushTokenResp{}, r.push.DelFcmToken(ctx, req.UserID, int(req.PlatformID)) } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 31052cf27..3500ed8cc 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -9,280 +9,258 @@ package push import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/cache" + "Open_IM/pkg/common/db/localcache" "Open_IM/pkg/common/log" "Open_IM/pkg/common/prome" - pbPush "Open_IM/pkg/proto/push" - pbRelay "Open_IM/pkg/proto/relay" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/discoveryregistry" + msggateway "Open_IM/pkg/proto/msggateway" pbRtc "Open_IM/pkg/proto/rtc" + "Open_IM/pkg/proto/sdkws" "Open_IM/pkg/utils" "context" + "errors" "github.com/golang/protobuf/proto" - "strings" ) -type AtContent struct { - Text string `json:"text"` - AtUserList []string `json:"atUserList"` - IsAtSelf bool `json:"isAtSelf"` +type Pusher struct { + cache cache.Cache + client discoveryregistry.SvcDiscoveryRegistry + offlinePusher OfflinePusher + groupLocalCache localcache.GroupLocalCache + conversationLocalCache localcache.ConversationLocalCache + successCount int } -func MsgToUser(pushMsg *pbPush.PushMsgReq) { - var wsResult []*pbRelay.SingelMsgToUserResultList - isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) - log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) - - grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID) - - var UIDList = []string{pushMsg.PushToUserID} - callbackResp := callbackOnlinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData) - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush callback Resp") - if callbackResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOnlinePush result: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush stop") - return +func NewPusher(cache cache.Cache, client discoveryregistry.SvcDiscoveryRegistry, offlinePusher OfflinePusher) *Pusher { + return &Pusher{ + cache: cache, + client: client, + offlinePusher: offlinePusher, } +} - //Online push message - log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) - for _, v := range grpcCons { - msgClient := pbRelay.NewRelayClient(v) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}}) - if err != nil { - log.NewError("SuperGroupOnlineBatchPushOneMsg push data to client rpc err", pushMsg.OperationID, "err", err) - continue - } - if reply != nil && reply.SinglePushResult != nil { - wsResult = append(wsResult, reply.SinglePushResult...) - } +func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgData) error { + operationID := tracelog.GetOperationID(ctx) + var userIDs = []string{userID} + log.Debug(operationID, "Get msg from msg_transfer And push msg", msg.String(), userID) + // callback + if err := callbackOnlinePush(ctx, userIDs, msg); err != nil { + return err } - log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData, "isOfflinePush", isOfflinePush) - successCount++ - if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { + // push + wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) + if err != nil { + return err + } + isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush) + p.successCount++ + if isOfflinePush && userID != msg.SendID { // save invitation info for offline push - for _, v := range wsResult { + for _, v := range wsResults { if v.OnlinePush { - return + return nil } } - if pushMsg.MsgData.ContentType == constant.SignalingNotification { - isSend, err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData, pushMsg.PushToUserID) + if msg.ContentType == constant.SignalingNotification { + isSend, err := p.cache.HandleSignalInfo(ctx, msg, userID) if err != nil { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData) - return + return err } if !isSend { - return + return nil } } - var title, detailContent string - callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData, &[]string{}) - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") - if callbackResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") - return - } - if pushMsg.MsgData.OfflinePushInfo != nil { - title = pushMsg.MsgData.OfflinePushInfo.Title - detailContent = pushMsg.MsgData.OfflinePushInfo.Desc - } - if offlinePusher == nil { - return + if err := callbackOfflinePush(ctx, userIDs, msg, &[]string{}); err != nil { + return err } - opts, err := GetOfflinePushOpts(pushMsg) + err = p.OfflinePushMsg(ctx, userID, msg, userIDs) if err != nil { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error()) - } - log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), UIDList, title, detailContent, "opts:", opts) - if title == "" { - switch pushMsg.MsgData.ContentType { - case constant.Text: - fallthrough - case constant.Picture: - fallthrough - case constant.Voice: - fallthrough - case constant.Video: - fallthrough - case constant.File: - title = constant.ContentType2PushContent[int64(pushMsg.MsgData.ContentType)] - case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) { - title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - title = constant.ContentType2PushContent[constant.GroupMsg] - } - case constant.SignalingNotification: - title = constant.ContentType2PushContent[constant.SignalMsg] - default: - title = constant.ContentType2PushContent[constant.Common] - - } - // detailContent = title - } - if detailContent == "" { - detailContent = title - } - pushResult, err := offlinePusher.Push(UIDList, title, detailContent, pushMsg.OperationID, opts) - if err != nil { - prome.PromeInc(prome.MsgOfflinePushFailedCounter) - log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) - } else { - prome.PromeInc(prome.MsgOfflinePushSuccessCounter) - log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) + log.NewError(operationID, "OfflinePushMsg failed", userID) + return err } } + return nil } -func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { - var wsResult []*pbRelay.SingelMsgToUserResultList - isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) - log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String(), config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable) - var pushToUserIDList []string - if config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable { - callbackResp := callbackBeforeSuperGroupOnlinePush(pushMsg.OperationID, pushMsg.PushToUserID, pushMsg.MsgData, &pushToUserIDList) - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") - if callbackResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "onlinePush stop") - return - } - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList) +func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { + operationID := tracelog.GetOperationID(ctx) + log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID) + var pushToUserIDs []string + if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil { + return err } - if len(pushToUserIDList) == 0 { - userIDList, err := utils.GetGroupMemberUserIDList(context.Background(), pushMsg.MsgData.GroupID, pushMsg.OperationID) + if len(pushToUserIDs) == 0 { + pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID) if err != nil { - log.Error(pushMsg.OperationID, "GetGroupMemberUserIDList failed ", err.Error(), pushMsg.MsgData.GroupID) - return + return err } - pushToUserIDList = userIDList } + wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) + if err != nil { + return err + } + log.Debug(operationID, "push_result", wsResults, "sendData", msg) + p.successCount++ + isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + if isOfflinePush { + var onlineSuccessUserIDs []string + var WebAndPcBackgroundUserIDs []string + onlineSuccessUserIDs = append(onlineSuccessUserIDs, msg.SendID) + for _, v := range wsResults { + if v.OnlinePush && v.UserID != msg.SendID { + onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) + } + if !v.OnlinePush { + if len(v.Resp) != 0 { + for _, singleResult := range v.Resp { + if singleResult.ResultCode == -2 { + if constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC || + singleResult.RecvPlatFormID == constant.WebPlatformID { + WebAndPcBackgroundUserIDs = append(WebAndPcBackgroundUserIDs, v.UserID) + } + } + } + } + } + } + needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) + if msg.ContentType != constant.SignalingNotification { + notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID) + if err != nil { + log.Error(operationID, utils.GetSelfFuncName(), "GetRecvMsgNotNotifyUserIDs failed", groupID) + return err + } + needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs) + } + //Use offline push messaging + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err = callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + err = p.OfflinePushMsg(ctx, groupID, msg, offlinePushUserIDs) + if err != nil { + log.NewError(operationID, "OfflinePushMsg failed", groupID) + return err + } + _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs)) + if err != nil { + log.NewError(operationID, "OfflinePushMsg failed", groupID) + return err + } + } + } + return nil +} - grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID) - +func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingelMsgToUserResultList, err error) { + conns, err := p.client.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName) + if err != nil { + return nil, err + } //Online push message - log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) - for _, v := range grpcCons { - msgClient := pbRelay.NewRelayClient(v) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushToUserIDList}) + for _, v := range conns { + msgClient := msggateway.NewRelayClient(v) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{OperationID: tracelog.GetOperationID(ctx), MsgData: msg, PushToUserIDList: pushToUserIDs}) if err != nil { - log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err) + log.NewError(tracelog.GetOperationID(ctx), msg, len(pushToUserIDs), "err", err) continue } if reply != nil && reply.SinglePushResult != nil { - wsResult = append(wsResult, reply.SinglePushResult...) - } - } - log.Debug(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData) - successCount++ - if isOfflinePush { - var onlineSuccessUserIDList []string - onlineSuccessUserIDList = append(onlineSuccessUserIDList, pushMsg.MsgData.SendID) - for _, v := range wsResult { - if v.OnlinePush && v.UserID != pushMsg.MsgData.SendID { - onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID) - } - } - onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, pushToUserIDList) - //Use offline push messaging - var title, detailContent string - if len(onlineFailedUserIDList) > 0 { - var offlinePushUserIDList []string - var needOfflinePushUserIDList []string - callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList, pushMsg.MsgData, &offlinePushUserIDList) - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp") - if callbackResp.ErrCode != 0 { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) - } - if callbackResp.ActionCode != constant.ActionAllow { - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offlinePush stop") - return - } - if pushMsg.MsgData.OfflinePushInfo != nil { - title = pushMsg.MsgData.OfflinePushInfo.Title - detailContent = pushMsg.MsgData.OfflinePushInfo.Desc - } - if len(offlinePushUserIDList) > 0 { - needOfflinePushUserIDList = offlinePushUserIDList - } else { - needOfflinePushUserIDList = onlineFailedUserIDList - } - - if offlinePusher == nil { - return - } - opts, err := GetOfflinePushOpts(pushMsg) - if err != nil { - log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "GetOfflinePushOpts failed", pushMsg, err.Error()) - } - log.NewInfo(pushMsg.OperationID, utils.GetSelfFuncName(), needOfflinePushUserIDList, title, detailContent, "opts:", opts) - if title == "" { - switch pushMsg.MsgData.ContentType { - case constant.Text: - fallthrough - case constant.Picture: - fallthrough - case constant.Voice: - fallthrough - case constant.Video: - fallthrough - case constant.File: - title = constant.ContentType2PushContent[int64(pushMsg.MsgData.ContentType)] - case constant.AtText: - a := AtContent{} - _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a) - if utils.IsContain(pushMsg.PushToUserID, a.AtUserList) { - title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - title = constant.ContentType2PushContent[constant.GroupMsg] - } - case constant.SignalingNotification: - title = constant.ContentType2PushContent[constant.SignalMsg] - default: - title = constant.ContentType2PushContent[constant.Common] - - } - detailContent = title - } - pushResult, err := offlinePusher.Push(needOfflinePushUserIDList, title, detailContent, pushMsg.OperationID, opts) - if err != nil { - prome.PromeInc(prome.MsgOfflinePushFailedCounter) - log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error()) - } else { - prome.PromeInc(prome.MsgOfflinePushSuccessCounter) - log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) - } + wsResults = append(wsResults, reply.SinglePushResult...) } } + return wsResults, nil } -func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts *Opts, err error) { - if pushMsg.MsgData.ContentType > constant.SignalingNotificationBegin && pushMsg.MsgData.ContentType < constant.SignalingNotificationEnd { +func (p *Pusher) OfflinePushMsg(ctx context.Context, sourceID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error { + title, content, opts, err := p.GetOfflinePushInfos(sourceID, msg) + if err != nil { + return err + } + err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) + if err != nil { + prome.PromeInc(prome.MsgOfflinePushFailedCounter) + return err + } + prome.PromeInc(prome.MsgOfflinePushSuccessCounter) + return nil +} + +func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *Opts, err error) { + opts = &Opts{} + if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd { req := &pbRtc.SignalReq{} - if err := proto.Unmarshal(pushMsg.MsgData.Content, req); err != nil { + if err := proto.Unmarshal(msg.Content, req); err != nil { return nil, utils.Wrap(err, "") } - opts = &Opts{} switch req.Payload.(type) { case *pbRtc.SignalReq_Invite, *pbRtc.SignalReq_InviteInGroup: - opts.Signal.ClientMsgID = pushMsg.MsgData.ClientMsgID - log.NewDebug(pushMsg.OperationID, opts) + opts.Signal = &Signal{ClientMsgID: msg.ClientMsgID} } } - if pushMsg.MsgData.OfflinePushInfo != nil { - opts = &Opts{} - opts.IOSBadgeCount = pushMsg.MsgData.OfflinePushInfo.IOSBadgeCount - opts.IOSPushSound = pushMsg.MsgData.OfflinePushInfo.IOSPushSound - opts.Ex = pushMsg.MsgData.OfflinePushInfo.Ex + if msg.OfflinePushInfo != nil { + opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount + opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound + opts.Ex = msg.OfflinePushInfo.Ex } return opts, nil } + +func (p *Pusher) GetOfflinePushInfos(sourceID string, msg *sdkws.MsgData) (title, content string, opts *Opts, err error) { + if p.offlinePusher == nil { + err = errors.New("no offlinePusher is configured") + return + } + type AtContent struct { + Text string `json:"text"` + AtUserList []string `json:"atUserList"` + IsAtSelf bool `json:"isAtSelf"` + } + opts, err = p.GetOfflinePushOpts(msg) + if err != nil { + return + } + if msg.OfflinePushInfo != nil { + title = msg.OfflinePushInfo.Title + content = msg.OfflinePushInfo.Desc + } + if title == "" { + switch msg.ContentType { + case constant.Text: + fallthrough + case constant.Picture: + fallthrough + case constant.Voice: + fallthrough + case constant.Video: + fallthrough + case constant.File: + title = constant.ContentType2PushContent[int64(msg.ContentType)] + case constant.AtText: + a := AtContent{} + _ = utils.JsonStringToStruct(string(msg.Content), &a) + if utils.IsContain(sourceID, a.AtUserList) { + title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] + } else { + title = constant.ContentType2PushContent[constant.GroupMsg] + } + case constant.SignalingNotification: + title = constant.ContentType2PushContent[constant.SignalMsg] + default: + title = constant.ContentType2PushContent[constant.Common] + } + } + if content == "" { + content = title + } + return +} diff --git a/pkg/callbackstruct/push.go b/pkg/callbackstruct/push.go index 8e3dab706..e68694626 100644 --- a/pkg/callbackstruct/push.go +++ b/pkg/callbackstruct/push.go @@ -16,24 +16,24 @@ type CallbackBeforePushReq struct { type CallbackBeforePushResp struct { CommonCallbackResp - UserIDList []string `json:"userIDList"` + UserIDs []string `json:"userIDList"` OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"` } type CallbackBeforeSuperGroupOnlinePushReq struct { UserStatusBaseCallback - ClientMsgID string `json:"clientMsgID"` - SendID string `json:"sendID"` - GroupID string `json:"groupID"` - ContentType int32 `json:"contentType"` - SessionType int32 `json:"sessionType"` - AtUserIDList []string `json:"atUserIDList"` - Content string `json:"content"` - Seq int64 `json:"seq"` + ClientMsgID string `json:"clientMsgID"` + SendID string `json:"sendID"` + GroupID string `json:"groupID"` + ContentType int32 `json:"contentType"` + SessionType int32 `json:"sessionType"` + AtUserIDs []string `json:"atUserIDList"` + Content string `json:"content"` + Seq int64 `json:"seq"` } type CallbackBeforeSuperGroupOnlinePushResp struct { CommonCallbackResp - UserIDList []string `json:"userIDList"` + UserIDs []string `json:"userIDList"` OfflinePushInfo *common.OfflinePushInfo `json:"offlinePushInfo"` } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ecf6b7774..1f42218c1 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -129,20 +129,19 @@ type config struct { OpenImPushPort []int `yaml:"openImPushPort"` OpenImConversationPort []int `yaml:"openImConversationPort"` OpenImCachePort []int `yaml:"openImCachePort"` - OpenImRealTimeCommPort []int `yaml:"openImRealTimeCommPort"` + OpenImRtcPort []int `yaml:"openImRtcPort"` } RpcRegisterName struct { - OpenImUserName string `yaml:"openImUserName"` - OpenImFriendName string `yaml:"openImFriendName"` - // OpenImOfflineMessageName string `yaml:"openImOfflineMessageName"` - OpenImMsgName string `yaml:"openImMsgName"` - OpenImPushName string `yaml:"openImPushName"` - OpenImRelayName string `yaml:"openImRelayName"` - OpenImGroupName string `yaml:"openImGroupName"` - OpenImAuthName string `yaml:"openImAuthName"` - OpenImConversationName string `yaml:"openImConversationName"` - OpenImCacheName string `yaml:"openImCacheName"` - OpenImRealTimeCommName string `yaml:"openImRealTimeCommName"` + OpenImUserName string `yaml:"openImUserName"` + OpenImFriendName string `yaml:"openImFriendName"` + OpenImMsgName string `yaml:"openImMsgName"` + OpenImPushName string `yaml:"openImPushName"` + OpenImMessageGatewayName string `yaml:"openImMessageGatewayName"` + OpenImGroupName string `yaml:"openImGroupName"` + OpenImAuthName string `yaml:"openImAuthName"` + OpenImConversationName string `yaml:"openImConversationName"` + OpenImCacheName string `yaml:"openImCacheName"` + OpenImRtcName string `yaml:"openImRtcName"` } Zookeeper struct { Schema string `yaml:"schema"` @@ -481,7 +480,7 @@ type config struct { AuthPrometheusPort []int `yaml:"authPrometheusPort"` PushPrometheusPort []int `yaml:"pushPrometheusPort"` ConversationPrometheusPort []int `yaml:"conversationPrometheusPort"` - RealTimeCommPrometheusPort []int `yaml:"realTimeCommPrometheusPort"` + RtcPrometheusPort []int `yaml:"RtcPrometheusPort"` MessageTransferPrometheusPort []int `yaml:"messageTransferPrometheusPort"` } `yaml:"prometheus"` } diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index c649368f5..6d83cae9c 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -308,17 +308,16 @@ func (r *RedisClient) HandleSignalInfo(ctx context.Context, operationID string, if err := proto.Unmarshal(msg.Content, req); err != nil { return false, err } - //log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String()) - var inviteeUserIDList []string + var inviteeUserIDs []string var isInviteSignal bool switch signalInfo := req.Payload.(type) { case *pbRtc.SignalReq_Invite: - inviteeUserIDList = signalInfo.Invite.Invitation.InviteeUserIDList + inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList isInviteSignal = true case *pbRtc.SignalReq_InviteInGroup: - inviteeUserIDList = signalInfo.InviteInGroup.Invitation.InviteeUserIDList + inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList isInviteSignal = true - if !utils.IsContain(pushToUserID, inviteeUserIDList) { + if !utils.IsContain(pushToUserID, inviteeUserIDs) { return false, nil } case *pbRtc.SignalReq_HungUp, *pbRtc.SignalReq_Cancel, *pbRtc.SignalReq_Reject, *pbRtc.SignalReq_Accept: @@ -327,7 +326,7 @@ func (r *RedisClient) HandleSignalInfo(ctx context.Context, operationID string, return false, nil } if isInviteSignal { - for _, userID := range inviteeUserIDList { + for _, userID := range inviteeUserIDs { timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) if err != nil { return false, err diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index e772d17c6..a52da4a53 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -93,7 +93,7 @@ func (g *GroupController) DismissGroup(ctx context.Context, groupID string) erro } func (g *GroupController) GetGroupIDsByGroupType(ctx context.Context, groupType int) (groupIDs []string, err error) { - return g.database. + return g.database.GetGroupIDsByGroupType(ctx, groupType) } func (g *GroupController) TakeGroupMember(ctx context.Context, groupID string, userID string) (groupMember *relationTb.GroupMemberModel, err error) { diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go new file mode 100644 index 000000000..cca942487 --- /dev/null +++ b/pkg/common/db/controller/push.go @@ -0,0 +1,18 @@ +package controller + +import ( + "Open_IM/pkg/common/db/cache" + "context" +) + +type PushInterface interface { + DelFcmToken(ctx context.Context, userID string, platformID int) error +} + +type PushDataBase struct { + cache cache.Cache +} + +func (p *PushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { + return p.cache.DelFcmToken(ctx, userID, platformID) +} diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 2bcb4056f..06d0dd4c7 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -25,5 +25,5 @@ func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) Co func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { g.client.GetConn() - return []string{} + return []string{}, nil } diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go index fda1e2d48..e5bc03d45 100644 --- a/pkg/common/db/localcache/group.go +++ b/pkg/common/db/localcache/group.go @@ -3,7 +3,7 @@ package localcache import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - discoveryRegistry "Open_IM/pkg/discoveryregistry" + "Open_IM/pkg/discoveryregistry" "Open_IM/pkg/proto/group" "context" "sync" @@ -16,7 +16,7 @@ type GroupLocalCacheInterface interface { type GroupLocalCache struct { lock sync.Mutex cache map[string]GroupMemberIDsHash - client discoveryRegistry.SvcDiscoveryRegistry + client discoveryregistry.SvcDiscoveryRegistry } type GroupMemberIDsHash struct { @@ -24,7 +24,7 @@ type GroupMemberIDsHash struct { userIDs []string } -func NewGroupMemberIDsLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) *GroupLocalCache { +func NewGroupMemberIDsLocalCache(client discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache { return &GroupLocalCache{ cache: make(map[string]GroupMemberIDsHash, 0), client: client, @@ -34,7 +34,7 @@ func NewGroupMemberIDsLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { g.lock.Lock() defer g.lock.Unlock() - conn, err := g.client.GetConn(config.Config.RpcRegisterName.OpenImGroupName, nil) + conn, err := g.client.GetConn(config.Config.RpcRegisterName.OpenImGroupName) if err != nil { return nil, err } diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index e798beb0d..4c83e2747 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -33,7 +33,7 @@ func Get(url string) (response []byte, err error) { } // application/json; charset=utf-8 -func Post(url string, header map[string]string, data interface{}, timeOutSecond int) (content []byte, err error) { +func Post(url string, header map[string]string, data interface{}, timeout int) (content []byte, err error) { jsonStr, err := json.Marshal(data) if err != nil { return nil, err @@ -42,9 +42,12 @@ func Post(url string, header map[string]string, data interface{}, timeOutSecond if err != nil { return nil, err } + for k, v := range header { + req.Header.Set(k, v) + } req.Close = true req.Header.Add("content-type", "application/json; charset=utf-8") - client := &http.Client{Timeout: time.Duration(timeOutSecond) * time.Second} + client := &http.Client{Timeout: time.Duration(timeout) * time.Second} resp, err := client.Do(req) if err != nil { return nil, err diff --git a/pkg/common/prome/prometheus.go b/pkg/common/prome/prometheus.go index f6d70d9b8..20e982e92 100644 --- a/pkg/common/prome/prometheus.go +++ b/pkg/common/prome/prometheus.go @@ -11,10 +11,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -func StartPromeSrv(promethuesPort int) error { +func StartPromeSrv(prometheusPort int) error { if config.Config.Prometheus.Enable { http.Handle("/metrics", promhttp.Handler()) - err := http.ListenAndServe(":"+strconv.Itoa(promethuesPort), nil) + err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) return err } return nil diff --git a/pkg/discoveryregistry/startegy.go b/pkg/discoveryregistry/startegy.go index 79bde210e..f54d7d80d 100644 --- a/pkg/discoveryregistry/startegy.go +++ b/pkg/discoveryregistry/startegy.go @@ -14,15 +14,3 @@ func (r *Robin) Robin(slice []*grpc.ClientConn) int { } return index } - -type Hash struct { -} - -func (r *Hash) Hash(slice []*grpc.ClientConn) int { - index := r.next - r.next += 1 - if r.next > len(slice)-1 { - r.next = 0 - } - return index -} diff --git a/pkg/proto/relay/relay.pb.go b/pkg/proto/msggateway/relay.pb.go similarity index 100% rename from pkg/proto/relay/relay.pb.go rename to pkg/proto/msggateway/relay.pb.go diff --git a/pkg/proto/relay/relay.proto b/pkg/proto/msggateway/relay.proto similarity index 100% rename from pkg/proto/relay/relay.proto rename to pkg/proto/msggateway/relay.proto diff --git a/pkg/proto/push/push.pb.go b/pkg/proto/push/push.pb.go index 04a475951..5208657b9 100644 --- a/pkg/proto/push/push.pb.go +++ b/pkg/proto/push/push.pb.go @@ -25,9 +25,8 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type PushMsgReq struct { - OperationID string `protobuf:"bytes,1,opt,name=operationID" json:"operationID,omitempty"` - MsgData *sdkws.MsgData `protobuf:"bytes,2,opt,name=msgData" json:"msgData,omitempty"` - PushToUserID string `protobuf:"bytes,3,opt,name=pushToUserID" json:"pushToUserID,omitempty"` + MsgData *sdkws.MsgData `protobuf:"bytes,1,opt,name=msgData" json:"msgData,omitempty"` + SourceID string `protobuf:"bytes,2,opt,name=sourceID" json:"sourceID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -37,7 +36,7 @@ func (m *PushMsgReq) Reset() { *m = PushMsgReq{} } func (m *PushMsgReq) String() string { return proto.CompactTextString(m) } func (*PushMsgReq) ProtoMessage() {} func (*PushMsgReq) Descriptor() ([]byte, []int) { - return fileDescriptor_push_17f752d1b1c8edd5, []int{0} + return fileDescriptor_push_534b6eabcdc561f2, []int{0} } func (m *PushMsgReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PushMsgReq.Unmarshal(m, b) @@ -57,13 +56,6 @@ func (m *PushMsgReq) XXX_DiscardUnknown() { var xxx_messageInfo_PushMsgReq proto.InternalMessageInfo -func (m *PushMsgReq) GetOperationID() string { - if m != nil { - return m.OperationID - } - return "" -} - func (m *PushMsgReq) GetMsgData() *sdkws.MsgData { if m != nil { return m.MsgData @@ -71,15 +63,14 @@ func (m *PushMsgReq) GetMsgData() *sdkws.MsgData { return nil } -func (m *PushMsgReq) GetPushToUserID() string { +func (m *PushMsgReq) GetSourceID() string { if m != nil { - return m.PushToUserID + return m.SourceID } return "" } type PushMsgResp struct { - ResultCode int32 `protobuf:"varint,1,opt,name=ResultCode" json:"ResultCode,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -89,7 +80,7 @@ func (m *PushMsgResp) Reset() { *m = PushMsgResp{} } func (m *PushMsgResp) String() string { return proto.CompactTextString(m) } func (*PushMsgResp) ProtoMessage() {} func (*PushMsgResp) Descriptor() ([]byte, []int) { - return fileDescriptor_push_17f752d1b1c8edd5, []int{1} + return fileDescriptor_push_534b6eabcdc561f2, []int{1} } func (m *PushMsgResp) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PushMsgResp.Unmarshal(m, b) @@ -109,17 +100,9 @@ func (m *PushMsgResp) XXX_DiscardUnknown() { var xxx_messageInfo_PushMsgResp proto.InternalMessageInfo -func (m *PushMsgResp) GetResultCode() int32 { - if m != nil { - return m.ResultCode - } - return 0 -} - type DelUserPushTokenReq struct { - OperationID string `protobuf:"bytes,1,opt,name=operationID" json:"operationID,omitempty"` - UserID string `protobuf:"bytes,2,opt,name=userID" json:"userID,omitempty"` - PlatformID int32 `protobuf:"varint,3,opt,name=platformID" json:"platformID,omitempty"` + UserID string `protobuf:"bytes,1,opt,name=userID" json:"userID,omitempty"` + PlatformID int32 `protobuf:"varint,2,opt,name=platformID" json:"platformID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -129,7 +112,7 @@ func (m *DelUserPushTokenReq) Reset() { *m = DelUserPushTokenReq{} } func (m *DelUserPushTokenReq) String() string { return proto.CompactTextString(m) } func (*DelUserPushTokenReq) ProtoMessage() {} func (*DelUserPushTokenReq) Descriptor() ([]byte, []int) { - return fileDescriptor_push_17f752d1b1c8edd5, []int{2} + return fileDescriptor_push_534b6eabcdc561f2, []int{2} } func (m *DelUserPushTokenReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DelUserPushTokenReq.Unmarshal(m, b) @@ -149,13 +132,6 @@ func (m *DelUserPushTokenReq) XXX_DiscardUnknown() { var xxx_messageInfo_DelUserPushTokenReq proto.InternalMessageInfo -func (m *DelUserPushTokenReq) GetOperationID() string { - if m != nil { - return m.OperationID - } - return "" -} - func (m *DelUserPushTokenReq) GetUserID() string { if m != nil { return m.UserID @@ -171,8 +147,6 @@ func (m *DelUserPushTokenReq) GetPlatformID() int32 { } type DelUserPushTokenResp struct { - ErrCode int32 `protobuf:"varint,1,opt,name=errCode" json:"errCode,omitempty"` - ErrMsg string `protobuf:"bytes,2,opt,name=errMsg" json:"errMsg,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -182,7 +156,7 @@ func (m *DelUserPushTokenResp) Reset() { *m = DelUserPushTokenResp{} } func (m *DelUserPushTokenResp) String() string { return proto.CompactTextString(m) } func (*DelUserPushTokenResp) ProtoMessage() {} func (*DelUserPushTokenResp) Descriptor() ([]byte, []int) { - return fileDescriptor_push_17f752d1b1c8edd5, []int{3} + return fileDescriptor_push_534b6eabcdc561f2, []int{3} } func (m *DelUserPushTokenResp) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DelUserPushTokenResp.Unmarshal(m, b) @@ -202,20 +176,6 @@ func (m *DelUserPushTokenResp) XXX_DiscardUnknown() { var xxx_messageInfo_DelUserPushTokenResp proto.InternalMessageInfo -func (m *DelUserPushTokenResp) GetErrCode() int32 { - if m != nil { - return m.ErrCode - } - return 0 -} - -func (m *DelUserPushTokenResp) GetErrMsg() string { - if m != nil { - return m.ErrMsg - } - return "" -} - func init() { proto.RegisterType((*PushMsgReq)(nil), "push.PushMsgReq") proto.RegisterType((*PushMsgResp)(nil), "push.PushMsgResp") @@ -328,30 +288,26 @@ var _PushMsgService_serviceDesc = grpc.ServiceDesc{ Metadata: "push/push.proto", } -func init() { proto.RegisterFile("push/push.proto", fileDescriptor_push_17f752d1b1c8edd5) } +func init() { proto.RegisterFile("push/push.proto", fileDescriptor_push_534b6eabcdc561f2) } -var fileDescriptor_push_17f752d1b1c8edd5 = []byte{ - // 342 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0x4f, 0x4b, 0xfb, 0x40, - 0x10, 0x25, 0xfd, 0xfd, 0xda, 0xe2, 0x54, 0xb4, 0xae, 0x22, 0x31, 0xa0, 0x96, 0x9c, 0x7a, 0x69, - 0x02, 0xd5, 0x9b, 0x37, 0xcd, 0xc1, 0x1c, 0x82, 0x25, 0xea, 0xc5, 0x4b, 0xd8, 0xda, 0x35, 0x2d, - 0xfd, 0xb3, 0xe3, 0x4e, 0x62, 0xbf, 0x82, 0xe0, 0x97, 0x96, 0xdd, 0x24, 0x1a, 0xab, 0x82, 0x97, - 0x65, 0xe7, 0xcd, 0xdb, 0x79, 0x6f, 0x76, 0x06, 0x76, 0x31, 0xa7, 0xa9, 0xaf, 0x0f, 0x0f, 0x95, - 0xcc, 0x24, 0xfb, 0xaf, 0xef, 0x4e, 0xff, 0x06, 0xc5, 0x6a, 0x10, 0x46, 0x83, 0x5b, 0xa1, 0x5e, - 0x84, 0xf2, 0x71, 0x9e, 0xfa, 0x26, 0xef, 0xd3, 0x64, 0x9e, 0xac, 0xc9, 0x5f, 0x53, 0xc1, 0x77, - 0x5f, 0x2d, 0x80, 0x51, 0x4e, 0xd3, 0x88, 0xd2, 0x58, 0x3c, 0xb3, 0x1e, 0x74, 0x24, 0x0a, 0xc5, - 0xb3, 0x99, 0x5c, 0x85, 0x81, 0x6d, 0xf5, 0xac, 0xfe, 0x56, 0x5c, 0x87, 0xd8, 0x39, 0xb4, 0x97, - 0x94, 0x06, 0x3c, 0xe3, 0x76, 0xa3, 0x67, 0xf5, 0x3b, 0x43, 0xc7, 0x23, 0x23, 0x92, 0x70, 0x9c, - 0x25, 0xc8, 0x15, 0x5f, 0x92, 0x17, 0x15, 0x8c, 0xb8, 0xa2, 0x32, 0x17, 0xb6, 0xb5, 0xb1, 0x3b, - 0x79, 0x4f, 0x42, 0x85, 0x81, 0xfd, 0xcf, 0x14, 0xfe, 0x82, 0xb9, 0x03, 0xe8, 0x7c, 0x38, 0x21, - 0x64, 0x27, 0x00, 0xb1, 0xa0, 0x7c, 0x91, 0x5d, 0xc9, 0x89, 0x30, 0x4e, 0x9a, 0x71, 0x0d, 0x71, - 0x25, 0xec, 0x07, 0x62, 0xa1, 0xdf, 0x8e, 0x4c, 0x95, 0xb9, 0x58, 0xfd, 0xad, 0x83, 0x43, 0x68, - 0xe5, 0x85, 0x8b, 0x86, 0x49, 0x96, 0x91, 0x16, 0xc4, 0x05, 0xcf, 0x9e, 0xa4, 0x5a, 0x96, 0x0e, - 0x9b, 0x71, 0x0d, 0x71, 0xaf, 0xe1, 0xe0, 0xbb, 0x20, 0x21, 0xb3, 0xa1, 0x2d, 0x94, 0xaa, 0xb9, - 0xac, 0x42, 0xad, 0x24, 0x94, 0x8a, 0x28, 0xad, 0x94, 0x8a, 0x68, 0xf8, 0x66, 0xc1, 0x4e, 0xd9, - 0xaa, 0x1e, 0xd0, 0xec, 0x51, 0x30, 0x0f, 0xda, 0x25, 0xc2, 0xba, 0x9e, 0x99, 0xe7, 0xe7, 0x54, - 0x9c, 0xbd, 0x0d, 0x84, 0x90, 0x85, 0xd0, 0xdd, 0x34, 0xc3, 0x8e, 0x0a, 0xda, 0x0f, 0xbf, 0xe2, - 0x38, 0xbf, 0xa5, 0x08, 0x2f, 0x4f, 0x1f, 0x8e, 0xf5, 0xba, 0x24, 0x61, 0x54, 0xdb, 0x13, 0x4d, - 0xbf, 0xc0, 0xb1, 0x66, 0x8e, 0x5b, 0x06, 0x3a, 0x7b, 0x0f, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x44, - 0x0e, 0xd2, 0x6d, 0x02, 0x00, 0x00, +var fileDescriptor_push_534b6eabcdc561f2 = []byte{ + // 279 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0x41, 0x4f, 0xc2, 0x40, + 0x10, 0x85, 0x53, 0xa3, 0x20, 0x43, 0x44, 0x5c, 0x0d, 0xc1, 0x26, 0x2a, 0xe9, 0x45, 0x2e, 0xb4, + 0x09, 0x1e, 0xbd, 0x99, 0x5e, 0x7a, 0xd8, 0x68, 0x56, 0xbd, 0x78, 0x31, 0x05, 0xc7, 0x62, 0x0a, + 0xec, 0xb8, 0xd3, 0xca, 0x8f, 0xf0, 0x4f, 0x9b, 0xdd, 0x56, 0x24, 0xa8, 0x97, 0xa6, 0xf3, 0xcd, + 0xcb, 0x7b, 0x79, 0xb3, 0x70, 0x48, 0x25, 0xcf, 0x22, 0xfb, 0x09, 0xc9, 0xe8, 0x42, 0x8b, 0x5d, + 0xfb, 0xef, 0x5f, 0xde, 0x12, 0x2e, 0x47, 0x89, 0x1c, 0xdd, 0xa3, 0xf9, 0x40, 0x13, 0x51, 0x9e, + 0x45, 0x6e, 0x1f, 0xf1, 0x4b, 0xbe, 0xe2, 0x68, 0xc5, 0x95, 0x3c, 0x50, 0x00, 0x77, 0x25, 0xcf, + 0x24, 0x67, 0x0a, 0xdf, 0xc5, 0x10, 0x9a, 0x0b, 0xce, 0xe2, 0xb4, 0x48, 0xfb, 0xde, 0xc0, 0x1b, + 0xb6, 0xc7, 0x9d, 0xd0, 0xe9, 0x43, 0x59, 0x51, 0xf5, 0xbd, 0x16, 0x3e, 0xec, 0xb3, 0x2e, 0xcd, + 0x14, 0x93, 0xb8, 0xbf, 0x33, 0xf0, 0x86, 0x2d, 0xb5, 0x9e, 0x83, 0x03, 0x68, 0xaf, 0x3d, 0x99, + 0x02, 0x09, 0xc7, 0x31, 0xce, 0x1f, 0x19, 0x8d, 0xa5, 0x0f, 0x3a, 0xc7, 0xa5, 0xcd, 0xea, 0x41, + 0xa3, 0x64, 0x34, 0x49, 0xec, 0xa2, 0x5a, 0xaa, 0x9e, 0xc4, 0x39, 0x00, 0xcd, 0xd3, 0xe2, 0x55, + 0x9b, 0x45, 0xed, 0xbd, 0xa7, 0x36, 0x48, 0xd0, 0x83, 0x93, 0xdf, 0x76, 0x4c, 0xe3, 0x4f, 0x0f, + 0x3a, 0x75, 0xac, 0x2d, 0xfd, 0x36, 0x45, 0x11, 0x42, 0xb3, 0x26, 0xa2, 0x1b, 0xba, 0x1b, 0xfd, + 0x74, 0xf5, 0x8f, 0xb6, 0x08, 0x93, 0x48, 0xa0, 0xbb, 0x6d, 0x2d, 0x4e, 0x2b, 0xd9, 0x1f, 0x0d, + 0x7c, 0xff, 0xbf, 0x15, 0xd3, 0xcd, 0xc5, 0xd3, 0x99, 0x7d, 0x82, 0xe7, 0x44, 0x6e, 0xdc, 0xde, + 0xca, 0xaf, 0x69, 0x62, 0x95, 0x93, 0x86, 0x43, 0x57, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x04, + 0x7d, 0xb3, 0xcd, 0xc1, 0x01, 0x00, 0x00, } diff --git a/pkg/proto/push/push.proto b/pkg/proto/push/push.proto index 56463af2f..596a0ede3 100644 --- a/pkg/proto/push/push.proto +++ b/pkg/proto/push/push.proto @@ -4,46 +4,23 @@ option go_package = "Open_IM/pkg/proto/push;pbPush"; package push; message PushMsgReq { - string operationID = 1; - sdkws.MsgData msgData = 2; - string pushToUserID = 3; -} -message PushMsgResp{ - int32 ResultCode = 1; -} -message DelUserPushTokenReq{ - string operationID = 1; - string userID =2; - int32 platformID = 3; -} -message DelUserPushTokenResp{ - int32 errCode = 1; - string errMsg = 2; + sdkws.MsgData msgData = 1; + string sourceID = 2; } -//message InternalPushMsgReq{ -// int32 ReqIdentifier = 1; -// string Token = 2; -// string SendID = 3; -// string OperationID = 4; -// int32 MsgIncr = 5; -// int32 PlatformID = 6; -// int32 SessionType = 7; -// int32 MsgFrom = 8; -// int32 ContentType = 9; -// string RecvID = 10; -// repeated string ForceList = 11; -// string Content = 12; -// string Options = 13; -// string ClientMsgID = 14; -// string OffLineInfo = 15; -// string Ex = 16; -// -//} +message PushMsgResp{ +} + +message DelUserPushTokenReq{ + string userID = 1; + int32 platformID = 2; +} + +message DelUserPushTokenResp{ +} service PushMsgService { rpc PushMsg(PushMsgReq) returns(PushMsgResp); rpc DelUserPushToken(DelUserPushTokenReq) returns(DelUserPushTokenResp); -// rpc InternalPushMsg(InternalPushMsgReq)returns(PushMsgResp); } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 48a228021..9cc9b6908 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -131,6 +131,7 @@ func DifferenceString(slice1, slice2 []string) []string { } return n } + func OperationIDGenerator() string { return strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10) }