notification

This commit is contained in:
wangchuxiao 2023-04-23 14:21:36 +08:00
parent e3d907f113
commit 4dccfd0a81
9 changed files with 276 additions and 236 deletions

View File

@ -3,13 +3,14 @@ package group
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"

View File

@ -19,6 +19,22 @@ type UserModel struct {
GlobalRecvMsgOpt int32 `gorm:"column:global_recv_msg_opt"`
}
func (u *UserModel) GetNickname() string {
return UserModelTableName
}
func (u *UserModel) GetFaceURL() string {
return u.FaceURL
}
func (u *UserModel) GetUserID() string {
return u.UserID
}
func (u *UserModel) GetEx() string {
return u.Ex
}
func (UserModel) TableName() string {
return UserModelTableName
}

View File

@ -1,133 +1,37 @@
package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"strconv"
"strings"
)
import "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
const (
singleGocMsgNum = 5000
Msg = "msg"
OldestList = 0
NewestList = -1
)
type MsgDocModel struct {
DocID string `bson:"uid"`
Msg []MsgInfoModel `bson:"msg"`
type MsgModel struct {
SendID string `bson:"send_id"`
RecvID string `bson:"recv_id"`
GroupID string `bson:"group_id"`
ClientMsgID string `bson:"client_msg_id"` // 客户端消息ID
ServerMsgID string `bson:"server_msg_id"` // 服务端消息ID
SenderPlatformID int32 `bson:"sender_platform_id"`
SenderNickname string `bson:"sender_nickname"`
SenderFaceURL string `bson:"sender_face_url"`
SessionType int32 `bson:"session_type"`
MsgFrom int32 `bson:"msg_from"`
ContentType int32 `bson:"contentType"`
Content []byte `bson:"content"`
Seq int64 `bson:"seq"`
SendTime int64 `bson:"sendTime"`
CreateTime int64 `bson:"createTime"`
Status int32 `bson:"status"`
Options map[string]bool `bson:"options"`
OfflinePushInfo *sdkws.OfflinePushInfo `bson:"offlinePushInfo"`
AtUserIDList []string `bson:"atUserIDList"`
MsgDataList []byte `bson:"msgDataList"`
AttachedInfo string `bson:"attachedInfo"`
Ex string `bson:"ex"`
}
type MsgInfoModel struct {
SendTime int64 `bson:"sendtime"`
Msg []byte `bson:"msg"`
type ReadDiffusionMsgModel struct {
*MsgModel
}
type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error)
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error
}
func (MsgDocModel) TableName() string {
return Msg
}
func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum
}
func (m *MsgDocModel) IsFull() bool {
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
if index == 0 {
if len(m.Msg) >= singleGocMsgNum-1 {
return true
}
}
if len(m.Msg) >= singleGocMsgNum {
return true
}
return false
}
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix)
}
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID)
}
return seqUserIDs
}
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix)
}
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok {
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
}
}
return t
}
func (m MsgDocModel) getMsgIndex(seq uint32) int {
seqSuffix := seq / singleGocMsgNum
var index uint32
if seqSuffix == 0 {
index = (seq - seqSuffix*singleGocMsgNum) - 1
} else {
index = seq - seqSuffix*singleGocMsgNum
}
return int(index)
}
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
msg.GroupID = groupID
msg.SessionType = constant.SuperGroupChatType
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
type WriteDiffusionMsgModel struct {
*MsgModel
UserID string `bson:"user_id"`
}

View File

@ -0,0 +1,133 @@
package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"strconv"
"strings"
)
const (
singleGocMsgNum = 5000
Msg = "msg"
OldestList = 0
NewestList = -1
)
type MsgDocModel struct {
DocID string `bson:"uid"`
Msg []MsgInfoModel `bson:"msg"`
}
type MsgInfoModel struct {
SendTime int64 `bson:"sendtime"`
Msg []byte `bson:"msg"`
}
type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)
GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error)
Delete(ctx context.Context, docIDs []string) error
GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error)
UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error
}
func (MsgDocModel) TableName() string {
return Msg
}
func (MsgDocModel) GetSingleGocMsgNum() int64 {
return singleGocMsgNum
}
func (m *MsgDocModel) IsFull() bool {
index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1])
if index == 0 {
if len(m.Msg) >= singleGocMsgNum-1 {
return true
}
}
if len(m.Msg) >= singleGocMsgNum {
return true
}
return false
}
func (m MsgDocModel) GetDocID(sourceID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.indexGen(sourceID, seqSuffix)
}
func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string {
seqMaxSuffix := maxSeq / singleGocMsgNum
var seqUserIDs []string
for i := 0; i <= int(seqMaxSuffix); i++ {
seqUserID := m.indexGen(userID, int64(i))
seqUserIDs = append(seqUserIDs, seqUserID)
}
return seqUserIDs
}
func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return m.superGroupIndexGen(groupID, seqSuffix)
}
func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string {
return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ {
docID := m.GetDocID(sourceID, seqs[i])
if value, ok := t[docID]; !ok {
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
}
}
return t
}
func (m MsgDocModel) getMsgIndex(seq uint32) int {
seqSuffix := seq / singleGocMsgNum
var index uint32
if seqSuffix == 0 {
index = (seq - seqSuffix*singleGocMsgNum) - 1
} else {
index = seq - seqSuffix*singleGocMsgNum
}
return int(index)
}
func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string {
return sourceID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}
func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msg := new(sdkws.MsgData)
msg.Seq = v
msg.GroupID = groupID
msg.SessionType = constant.SuperGroupChatType
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}

View File

@ -1,63 +0,0 @@
package check
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
discoveryRegistry "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"google.golang.org/grpc"
)
type MessageGateWayRpcClient struct {
zk discoveryRegistry.SvcDiscoveryRegistry
}
func NewMessageGateWayRpcClient(zk discoveryRegistry.SvcDiscoveryRegistry) *MessageGateWayRpcClient {
return &MessageGateWayRpcClient{
zk: zk,
}
}
func (m *MessageGateWayRpcClient) GetFriendsInfo(ctx context.Context, ownerUserID, friendUserID string) (resp *sdkws.FriendInfo, err error) {
cc, err := m.getConn()
if err != nil {
return nil, err
}
r, err := friend.NewFriendClient(cc).GetDesignatedFriends(ctx, &friend.GetDesignatedFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{friendUserID}})
if err != nil {
return nil, err
}
resp = r.FriendsInfo[0]
return
}
func (m *MessageGateWayRpcClient) getConn() (*grpc.ClientConn, error) {
return m.zk.GetConn(config.Config.RpcRegisterName.OpenImMessageGatewayName)
}
// possibleFriendUserID是否在userID的好友中
func (m *MessageGateWayRpcClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) {
cc, err := m.getConn()
if err != nil {
return false, err
}
resp, err := friend.NewFriendClient(cc).IsFriend(ctx, &friend.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})
if err != nil {
return false, err
}
return resp.InUser1Friends, nil
}
func (m *MessageGateWayRpcClient) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
cc, err := m.getConn()
if err != nil {
return nil, err
}
req := friend.GetFriendIDsReq{UserID: ownerUserID}
resp, err := friend.NewFriendClient(cc).GetFriendIDs(ctx, &req)
if err != nil {
return nil, err
}
return resp.FriendIDs, err
}

View File

@ -5,27 +5,29 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
sdk "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
utils "github.com/OpenIMSDK/open_utils"
)
func FriendPb2DB(friend *sdkws.FriendInfo) (*relation.FriendModel, error) {
func FriendPb2DB(friend *sdkws.FriendInfo) *relation.FriendModel {
dbFriend := &relation.FriendModel{}
utils.CopyStructFields(dbFriend, friend)
dbFriend.FriendUserID = friend.FriendUser.UserID
dbFriend.CreateTime = utils.UnixSecondToTime(friend.CreateTime)
return dbFriend, nil
return dbFriend
}
func FriendDB2Pb(ctx context.Context, friendDB *relation.FriendModel, fn func(ctx context.Context, userID string) (*sdkws.UserInfo, error)) (*sdk.FriendInfo, error) {
pbfriend := &sdk.FriendInfo{FriendUser: &sdk.UserInfo{}}
func FriendDB2Pb(ctx context.Context, friendDB *relation.FriendModel, getUser func(ctx context.Context, userIDs []string) ([]rpcclient.CommonUser, error)) (*sdkws.FriendInfo, error) {
pbfriend := &sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}}
utils.CopyStructFields(pbfriend, friendDB)
user, err := fn(ctx, friendDB.FriendUserID)
users, err := getUser(ctx, []string{friendDB.FriendUserID})
if err != nil {
return nil, err
}
utils.CopyStructFields(pbfriend.FriendUser, user)
pbfriend.FriendUser.UserID = users[0].GetUserID()
pbfriend.FriendUser.Nickname = users[0].GetNickname()
pbfriend.FriendUser.FaceURL = users[0].GetFaceURL()
pbfriend.FriendUser.Ex = users[0].GetEx()
pbfriend.CreateTime = friendDB.CreateTime.Unix()
pbfriend.FriendUser.CreateTime = friendDB.CreateTime.Unix()
return pbfriend, nil
}

View File

@ -11,17 +11,6 @@ type MetaClient struct {
rpcRegisterName string
}
type NotificationMsg struct {
SendID string
RecvID string
Content []byte // sdkws.TipsComm
MsgFrom int32
ContentType int32
SessionType int32
SenderNickname string
SenderFaceURL string
}
func NewMetaClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *MetaClient {
return &MetaClient{
client: client,
@ -36,3 +25,21 @@ func (m *MetaClient) getConn() (*grpc.ClientConn, error) {
func (m *MetaClient) getRpcRegisterName() string {
return m.rpcRegisterName
}
type NotificationMsg struct {
SendID string
RecvID string
Content []byte
MsgFrom int32
ContentType int32
SessionType int32
SenderNickname string
SenderFaceURL string
}
type CommonUser interface {
GetNickname() string
GetFaceURL() string
GetUserID() string
GetEx() string
}

View File

@ -5,30 +5,83 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
pbFriend "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/friend"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/convert"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
)
type FriendNotificationSender struct {
*rpcclient.MsgClient
getUsersInfoMap func(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.UserInfo, error)
getFriendsInfo func(ctx context.Context, ownerUserID, friendUserID string) (resp *sdkws.FriendInfo, err error)
getUsersInfo func(ctx context.Context, userIDs []string, complete bool) ([]*sdkws.UserInfo, error)
// 找不到报错
getUsersInfo func(ctx context.Context, userIDs []string) ([]rpcclient.CommonUser, error)
// db controller
db controller.FriendDatabase
}
func NewFriendNotificationSender(client discoveryregistry.SvcDiscoveryRegistry, getUsersInfoMap func(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.UserInfo, error)) *FriendNotificationSender {
return &FriendNotificationSender{
MsgClient: rpcclient.NewMsgClient(client),
getUsersInfoMap: getUsersInfoMap,
type friendNotificationSenderOptions func(*FriendNotificationSender)
func WithDBFunc(fn func(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error)) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []rpcclient.CommonUser, err error) {
users, err := fn(ctx, userIDs)
if err != nil {
return nil, err
}
for _, user := range users {
result = append(result, user)
}
return result, nil
}
s.getUsersInfo = f
}
}
func WithRpcFunc(fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error)) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []rpcclient.CommonUser, err error) {
users, err := fn(ctx, userIDs)
if err != nil {
return nil, err
}
for _, user := range users {
result = append(result, user)
}
return result, err
}
s.getUsersInfo = f
}
}
func NewFriendNotificationSender(client discoveryregistry.SvcDiscoveryRegistry, opts ...friendNotificationSenderOptions) *FriendNotificationSender {
f := &FriendNotificationSender{
MsgClient: rpcclient.NewMsgClient(client),
}
for _, opt := range opts {
opt(f)
}
return f
}
func (c *FriendNotificationSender) getUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) {
users, err := c.getUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
result := make(map[string]*sdkws.UserInfo)
for _, user := range users {
result[user.GetUserID()] = user.(*sdkws.UserInfo)
}
return result, nil
}
func (c *FriendNotificationSender) getFromToUserNickname(ctx context.Context, fromUserID, toUserID string) (string, string, error) {
users, err := c.getUsersInfoMap(ctx, []string{fromUserID, toUserID}, true)
users, err := c.getUsersInfoMap(ctx, []string{fromUserID, toUserID})
if err != nil {
return "", "", nil
}
@ -80,7 +133,6 @@ func (c *FriendNotificationSender) friendNotification(ctx context.Context, fromU
default:
return
}
var n rpcclient.NotificationMsg
n.SendID = fromUserID
n.RecvID = toUserID
@ -119,20 +171,20 @@ func (c *FriendNotificationSender) FriendApplicationRefusedNotification(ctx cont
func (c *FriendNotificationSender) FriendAddedNotification(ctx context.Context, operationID, opUserID, fromUserID, toUserID string) {
friendAddedTips := sdkws.FriendAddedTips{Friend: &sdkws.FriendInfo{}, OpUser: &sdkws.PublicUserInfo{}}
user, err := c.getUsersInfo(ctx, []string{opUserID}, true)
user, err := c.getUsersInfo(ctx, []string{opUserID})
if err != nil {
return
}
friendAddedTips.OpUser.UserID = user[0].UserID
friendAddedTips.OpUser.Ex = user[0].Ex
friendAddedTips.OpUser.Nickname = user[0].Nickname
friendAddedTips.OpUser.FaceURL = user[0].FaceURL
friendAddedTips.OpUser.UserID = user[0].GetUserID()
friendAddedTips.OpUser.Ex = user[0].GetEx()
friendAddedTips.OpUser.Nickname = user[0].GetNickname()
friendAddedTips.OpUser.FaceURL = user[0].GetFaceURL()
friend, err := c.getFriendsInfo(ctx, fromUserID, toUserID)
friends, err := c.db.FindFriendsWithError(ctx, fromUserID, []string{toUserID})
if err != nil {
return
}
friendAddedTips.Friend = friend
friendAddedTips.Friend, err = convert.FriendDB2Pb(ctx, friends[0], c.getUsersInfo)
c.friendNotification(ctx, fromUserID, toUserID, constant.FriendAddedNotification, &friendAddedTips)
}

View File

@ -1,12 +0,0 @@
package notification2
type NotificationMsg struct {
SendID string
RecvID string
Content []byte // sdkws.TipsComm
MsgFrom int32
ContentType int32
SessionType int32
SenderNickname string
SenderFaceURL string
}