mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-30 22:37:04 +08:00
Merge remote-tracking branch 'origin/main' into seq38
# Conflicts: # go.mod # go.sum # internal/api/friend.go # internal/rpc/group/sync.go # pkg/common/storage/cache/redis/seq.go # pkg/common/storage/controller/group.go # pkg/common/storage/database/mgo/group_member.go # pkg/common/storage/database/name.go
This commit is contained in:
commit
73a9265957
@ -43,7 +43,7 @@ COPY --from=builder $SERVER_DIR/start-config.yml $SERVER_DIR/
|
|||||||
COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/
|
COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/
|
||||||
COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/
|
COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/
|
||||||
|
|
||||||
RUN go get github.com/openimsdk/gomake@v0.0.13
|
RUN go get github.com/openimsdk/gomake@v0.0.14-alpha.5
|
||||||
|
|
||||||
# Set the command to run when the container starts
|
# Set the command to run when the container starts
|
||||||
ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"]
|
ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"]
|
||||||
|
|||||||
@ -13,6 +13,9 @@ afterUpdateUserInfoEx:
|
|||||||
afterSendSingleMsg:
|
afterSendSingleMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
|
# Only the senID/recvID specified in attentionIds will send the callback
|
||||||
|
# if not set, all user messages will be callback
|
||||||
|
attentionIds: []
|
||||||
beforeSendGroupMsg:
|
beforeSendGroupMsg:
|
||||||
enable: false
|
enable: false
|
||||||
timeout: 5
|
timeout: 5
|
||||||
|
|||||||
4
go.mod
4
go.mod
@ -35,7 +35,7 @@ require (
|
|||||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||||
github.com/kelindar/bitmap v1.5.2
|
github.com/kelindar/bitmap v1.5.2
|
||||||
github.com/likexian/gokit v0.25.13
|
github.com/likexian/gokit v0.25.13
|
||||||
github.com/openimsdk/gomake v0.0.13
|
github.com/openimsdk/gomake v0.0.14-alpha.5
|
||||||
github.com/redis/go-redis/v9 v9.4.0
|
github.com/redis/go-redis/v9 v9.4.0
|
||||||
github.com/robfig/cron/v3 v3.0.1
|
github.com/robfig/cron/v3 v3.0.1
|
||||||
github.com/shirou/gopsutil v3.21.11+incompatible
|
github.com/shirou/gopsutil v3.21.11+incompatible
|
||||||
@ -176,5 +176,3 @@ require (
|
|||||||
golang.org/x/crypto v0.21.0 // indirect
|
golang.org/x/crypto v0.21.0 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol
|
|
||||||
|
|||||||
12
go.sum
12
go.sum
@ -268,12 +268,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
|||||||
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
|
||||||
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.22 h1:kifZWVNDkg9diXFJUJ/Q9xFc80cveBhc+1dUXcE9xHQ=
|
github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M=
|
||||||
github.com/openimsdk/protocol v0.0.69-alpha.22/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.39 h1:bl5+q7xHsc/j1NnkN8/gYmn23RsNNbRizDY58d2EY1w=
|
github.com/openimsdk/tools v0.0.49-alpha.28 h1:1CfdFxvKzyOIvgNMVMq4ZB2upAJ0evLbbigOhWQzhu8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.39/go.mod h1:zc0maZ2ohXlHd0ylY5JnCE8uqq/hslhcfcKa6iO5PCU=
|
github.com/openimsdk/tools v0.0.49-alpha.28/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
|||||||
@ -67,6 +67,7 @@ func (o *GroupApi) GetGroupUsersReqApplicationList(c *gin.Context) {
|
|||||||
|
|
||||||
func (o *GroupApi) GetGroupsInfo(c *gin.Context) {
|
func (o *GroupApi) GetGroupsInfo(c *gin.Context) {
|
||||||
a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c)
|
a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c)
|
||||||
|
//a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupsInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *GroupApi) KickGroupMember(c *gin.Context) {
|
func (o *GroupApi) KickGroupMember(c *gin.Context) {
|
||||||
@ -75,6 +76,7 @@ func (o *GroupApi) KickGroupMember(c *gin.Context) {
|
|||||||
|
|
||||||
func (o *GroupApi) GetGroupMembersInfo(c *gin.Context) {
|
func (o *GroupApi) GetGroupMembersInfo(c *gin.Context) {
|
||||||
a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c)
|
a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c)
|
||||||
|
//a2r.Call(group.GroupClient.GetGroupMembersInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupMembersInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *GroupApi) GetGroupMemberList(c *gin.Context) {
|
func (o *GroupApi) GetGroupMemberList(c *gin.Context) {
|
||||||
|
|||||||
@ -101,6 +101,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
|
|||||||
SendTime: params.SendTime,
|
SendTime: params.SendTime,
|
||||||
Options: options,
|
Options: options,
|
||||||
OfflinePushInfo: params.OfflinePushInfo,
|
OfflinePushInfo: params.OfflinePushInfo,
|
||||||
|
Ex: params.Ex,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return &pbData
|
return &pbData
|
||||||
|
|||||||
@ -180,7 +180,7 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
|
||||||
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
|
||||||
var pushToUserIDs []string
|
var pushToUserIDs []string
|
||||||
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
|
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
|
||||||
&pushToUserIDs); err != nil {
|
&pushToUserIDs); err != nil {
|
||||||
|
|||||||
@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
userRpcClient: &userRpcClient,
|
userRpcClient: &userRpcClient,
|
||||||
RegisterCenter: client,
|
RegisterCenter: client,
|
||||||
authDatabase: controller.NewAuthDatabase(
|
authDatabase: controller.NewAuthDatabase(
|
||||||
redis2.NewTokenCacheModel(rdb),
|
redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire),
|
||||||
config.Share.Secret,
|
config.Share.Secret,
|
||||||
config.RpcConfig.TokenPolicy.Expire,
|
config.RpcConfig.TokenPolicy.Expire,
|
||||||
),
|
),
|
||||||
|
|||||||
@ -17,17 +17,18 @@ package group
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||||
"math/big"
|
|
||||||
"math/rand"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
@ -531,6 +532,14 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
|
|||||||
if datautil.Contain(opUserID, req.KickedUserIDs...) {
|
if datautil.Contain(opUserID, req.KickedUserIDs...) {
|
||||||
return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs")
|
return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs")
|
||||||
}
|
}
|
||||||
|
owner, err := s.db.TakeGroupOwner(ctx, req.GroupID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if datautil.Contain(owner.UserID, req.KickedUserIDs...) {
|
||||||
|
return nil, errs.ErrArgs.WrapMsg("ownerUID can not Kick")
|
||||||
|
}
|
||||||
|
|
||||||
members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID))
|
members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -590,7 +599,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
|
|||||||
FaceURL: group.FaceURL,
|
FaceURL: group.FaceURL,
|
||||||
OwnerUserID: ownerUserID,
|
OwnerUserID: ownerUserID,
|
||||||
CreateTime: group.CreateTime.UnixMilli(),
|
CreateTime: group.CreateTime.UnixMilli(),
|
||||||
MemberCount: num,
|
MemberCount: num - uint32(len(req.KickedUserIDs)),
|
||||||
Ex: group.Ex,
|
Ex: group.Ex,
|
||||||
Status: group.Status,
|
Status: group.Status,
|
||||||
CreatorUserID: group.CreatorUserID,
|
CreatorUserID: group.CreatorUserID,
|
||||||
|
|||||||
@ -36,6 +36,12 @@ import (
|
|||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// GroupApplicationReceiver
|
||||||
|
const (
|
||||||
|
applicantReceiver = iota
|
||||||
|
adminReceiver
|
||||||
|
)
|
||||||
|
|
||||||
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
|
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
|
||||||
return &GroupNotificationSender{
|
return &GroupNotificationSender{
|
||||||
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
|
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
|
||||||
@ -418,15 +424,17 @@ func (g *GroupNotificationSender) GroupApplicationAcceptedNotification(ctx conte
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, HandleMsg: req.HandledMsg}
|
|
||||||
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
var opUser *sdkws.GroupMemberFullInfo
|
||||||
|
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, userID := range append(userIDs, req.FromUserID) {
|
for _, userID := range append(userIDs, req.FromUserID) {
|
||||||
|
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
|
||||||
if userID == req.FromUserID {
|
if userID == req.FromUserID {
|
||||||
tips.ReceiverAs = 0
|
tips.ReceiverAs = applicantReceiver
|
||||||
} else {
|
} else {
|
||||||
tips.ReceiverAs = 1
|
tips.ReceiverAs = adminReceiver
|
||||||
}
|
}
|
||||||
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
|
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationAcceptedNotification, tips)
|
||||||
}
|
}
|
||||||
@ -449,15 +457,17 @@ func (g *GroupNotificationSender) GroupApplicationRejectedNotification(ctx conte
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tips := &sdkws.GroupApplicationRejectedTips{Group: group, HandleMsg: req.HandledMsg}
|
|
||||||
if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
|
var opUser *sdkws.GroupMemberFullInfo
|
||||||
|
if err = g.fillOpUser(ctx, &opUser, group.GroupID); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, userID := range append(userIDs, req.FromUserID) {
|
for _, userID := range append(userIDs, req.FromUserID) {
|
||||||
|
tips := &sdkws.GroupApplicationAcceptedTips{Group: group, OpUser: opUser, HandleMsg: req.HandledMsg}
|
||||||
if userID == req.FromUserID {
|
if userID == req.FromUserID {
|
||||||
tips.ReceiverAs = 0
|
tips.ReceiverAs = applicantReceiver
|
||||||
} else {
|
} else {
|
||||||
tips.ReceiverAs = 1
|
tips.ReceiverAs = adminReceiver
|
||||||
}
|
}
|
||||||
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
|
g.Notification(ctx, mcontext.GetOpUserID(ctx), userID, constant.GroupApplicationRejectedNotification, tips)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -82,6 +82,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
if vl.LogLen > 0 {
|
||||||
|
hasGroupUpdate = true
|
||||||
|
}
|
||||||
return vl, nil
|
return vl, nil
|
||||||
},
|
},
|
||||||
CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache,
|
CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache,
|
||||||
|
|||||||
@ -83,6 +83,11 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config
|
|||||||
if msg.MsgData.ContentType == constant.Typing {
|
if msg.MsgData.ContentType == constant.Typing {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// According to the attentionIds configuration, only some users are sent
|
||||||
|
attentionIds := after.AttentionIds
|
||||||
|
if attentionIds != nil && !datautil.Contain(msg.MsgData.RecvID, attentionIds...) && !datautil.Contain(msg.MsgData.SendID, attentionIds...) {
|
||||||
|
return
|
||||||
|
}
|
||||||
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
|
||||||
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
|
||||||
RecvID: msg.MsgData.RecvID,
|
RecvID: msg.MsgData.RecvID,
|
||||||
|
|||||||
@ -16,13 +16,15 @@ package msg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
pbmsg "github.com/openimsdk/protocol/msg"
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) {
|
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) {
|
||||||
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
|
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
|
||||||
if err != nil {
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil
|
return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil
|
||||||
|
|||||||
@ -55,6 +55,9 @@ type SendMsg struct {
|
|||||||
|
|
||||||
// OfflinePushInfo contains information for offline push notifications.
|
// OfflinePushInfo contains information for offline push notifications.
|
||||||
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
|
OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"`
|
||||||
|
|
||||||
|
// Ex stores extended fields
|
||||||
|
Ex string `json:"ex"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat.
|
// SendMsgReq extends SendMsg with the requirement of RecvID when SessionType indicates a one-on-one or notification chat.
|
||||||
|
|||||||
@ -341,8 +341,9 @@ type BeforeConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type AfterConfig struct {
|
type AfterConfig struct {
|
||||||
Enable bool `mapstructure:"enable"`
|
Enable bool `mapstructure:"enable"`
|
||||||
Timeout int `mapstructure:"timeout"`
|
Timeout int `mapstructure:"timeout"`
|
||||||
|
AttentionIds []string `mapstructure:"attentionIds"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Share struct {
|
type Share struct {
|
||||||
|
|||||||
@ -158,7 +158,6 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
case <-netDone:
|
case <-netDone:
|
||||||
close(netDone)
|
|
||||||
return netErr
|
return netErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
0
pkg/common/storage/cache/redis/seq.go
vendored
Normal file
0
pkg/common/storage/cache/redis/seq.go
vendored
Normal file
30
pkg/common/storage/cache/redis/token.go
vendored
30
pkg/common/storage/cache/redis/token.go
vendored
@ -21,22 +21,36 @@ import (
|
|||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tokenCache struct {
|
type tokenCache struct {
|
||||||
rdb redis.UniversalClient
|
rdb redis.UniversalClient
|
||||||
|
accessExpire time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTokenCacheModel(rdb redis.UniversalClient) cache.TokenModel {
|
func NewTokenCacheModel(rdb redis.UniversalClient, accessExpire int64) cache.TokenModel {
|
||||||
return &tokenCache{
|
c := &tokenCache{rdb: rdb}
|
||||||
rdb: rdb,
|
c.accessExpire = c.getExpireTime(accessExpire)
|
||||||
}
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *tokenCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
func (c *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||||
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
|
return errs.Wrap(c.rdb.HSet(ctx, cachekey.GetTokenKey(userID, platformID), token, flag).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTokenFlagEx set token and flag with expire time
|
||||||
|
func (c *tokenCache) SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error {
|
||||||
|
key := cachekey.GetTokenKey(userID, platformID)
|
||||||
|
if err := c.rdb.HSet(ctx, key, token, flag).Err(); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
if err := c.rdb.Expire(ctx, key, c.accessExpire).Err(); err != nil {
|
||||||
|
return errs.Wrap(err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
|
func (c *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error) {
|
||||||
m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
|
m, err := c.rdb.HGetAll(ctx, cachekey.GetTokenKey(userID, platformID)).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -61,3 +75,7 @@ func (c *tokenCache) SetTokenMapByUidPid(ctx context.Context, userID string, pla
|
|||||||
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
|
func (c *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error {
|
||||||
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
|
return errs.Wrap(c.rdb.HDel(ctx, cachekey.GetTokenKey(userID, platformID), fields...).Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *tokenCache) getExpireTime(t int64) time.Duration {
|
||||||
|
return time.Hour * 24 * time.Duration(t)
|
||||||
|
}
|
||||||
|
|||||||
4
pkg/common/storage/cache/token.go
vendored
4
pkg/common/storage/cache/token.go
vendored
@ -5,7 +5,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TokenModel interface {
|
type TokenModel interface {
|
||||||
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
|
SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
|
||||||
|
// SetTokenFlagEx set token and flag with expire time
|
||||||
|
SetTokenFlagEx(ctx context.Context, userID string, platformID int, token string, flag int) error
|
||||||
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
|
GetTokensWithoutError(ctx context.Context, userID string, platformID int) (map[string]int, error)
|
||||||
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
SetTokenMapByUidPid(ctx context.Context, userID string, platformID int, m map[string]int) error
|
||||||
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
DeleteTokenByUidPid(ctx context.Context, userID string, platformID int, fields []string) error
|
||||||
|
|||||||
@ -55,6 +55,7 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p
|
|||||||
|
|
||||||
// Create Token.
|
// Create Token.
|
||||||
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
|
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
|
||||||
|
isCreate := true // flag is create or update
|
||||||
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
|
tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@ -65,6 +66,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
|
|||||||
if err != nil || v != constant.NormalToken {
|
if err != nil || v != constant.NormalToken {
|
||||||
deleteTokenKey = append(deleteTokenKey, k)
|
deleteTokenKey = append(deleteTokenKey, k)
|
||||||
}
|
}
|
||||||
|
if v == constant.NormalToken {
|
||||||
|
isCreate = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(deleteTokenKey) != 0 {
|
if len(deleteTokenKey) != 0 {
|
||||||
err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
|
err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey)
|
||||||
@ -79,5 +83,17 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errs.WrapMsg(err, "token.SignedString")
|
return "", errs.WrapMsg(err, "token.SignedString")
|
||||||
}
|
}
|
||||||
return tokenString, a.cache.AddTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken)
|
|
||||||
|
if isCreate {
|
||||||
|
// should create,should specify expiration time
|
||||||
|
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// should update
|
||||||
|
if err = a.cache.SetTokenFlag(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tokenString, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -335,9 +335,6 @@ func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID s
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(friends) != len(friendUserIDs) {
|
|
||||||
err = errs.ErrRecordNotFound.Wrap()
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -378,6 +378,7 @@ func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, u
|
|||||||
DelGroupMembersInfo(groupID, userIDs...).
|
DelGroupMembersInfo(groupID, userIDs...).
|
||||||
DelGroupAllRoleLevel(groupID).
|
DelGroupAllRoleLevel(groupID).
|
||||||
DelMaxGroupMemberVersion(groupID).
|
DelMaxGroupMemberVersion(groupID).
|
||||||
|
DelMaxJoinGroupVersion(userIDs...).
|
||||||
ChainExecDel(ctx)
|
ChainExecDel(ctx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -400,10 +401,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string
|
|||||||
|
|
||||||
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
|
func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error {
|
||||||
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
|
return g.ctxTx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel); err != nil {
|
if err := g.groupMemberDB.UpdateUserRoleLevels(ctx, groupID, oldOwnerUserID, roleLevel, newOwnerUserID, constant.GroupOwner); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, newOwnerUserID, constant.GroupOwner); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c := g.cache.CloneGroupCache()
|
c := g.cache.CloneGroupCache()
|
||||||
|
|||||||
@ -25,6 +25,7 @@ type GroupMember interface {
|
|||||||
Delete(ctx context.Context, groupID string, userIDs []string) (err error)
|
Delete(ctx context.Context, groupID string, userIDs []string) (err error)
|
||||||
Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error)
|
Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error)
|
||||||
UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error
|
UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error
|
||||||
|
UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error
|
||||||
FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error)
|
FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error)
|
||||||
Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error)
|
Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error)
|
||||||
TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error)
|
TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error)
|
||||||
|
|||||||
@ -115,11 +115,28 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s
|
|||||||
|
|
||||||
func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error {
|
func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error {
|
||||||
return mongoutil.IncrVersion(func() error {
|
return mongoutil.IncrVersion(func() error {
|
||||||
return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel})
|
return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
|
||||||
|
bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
|
||||||
}, func() error {
|
}, func() error {
|
||||||
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
|
return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
|
||||||
|
return mongoutil.IncrVersion(func() error {
|
||||||
|
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": firstUserID},
|
||||||
|
bson.M{"$set": bson.M{"role_level": firstUserRoleLevel}}, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": secondUserID},
|
||||||
|
bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, func() error {
|
||||||
|
return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) {
|
func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) {
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
|
|||||||
39
pkg/common/storage/database/mgo/version_test.go
Normal file
39
pkg/common/storage/database/mgo/version_test.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package mgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Result[V any](val V, err error) V {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func Check(err error) {
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestName(t *testing.T) {
|
||||||
|
cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
|
||||||
|
coll := cli.Database("openim_v3").Collection("version_test")
|
||||||
|
tmp, err := NewVersionLog(coll)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
vl := tmp.(*VersionLogMgo)
|
||||||
|
res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
t.Log(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Logf("%+v", res)
|
||||||
|
}
|
||||||
@ -16,15 +16,19 @@ package rpccache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
pbconversation "github.com/openimsdk/protocol/conversation"
|
pbconversation "github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
conversationWorkerCount = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
|
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
|
||||||
@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
|
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
|
||||||
conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs))
|
var (
|
||||||
|
conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs))
|
||||||
|
conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs))
|
||||||
|
)
|
||||||
|
|
||||||
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
g.SetLimit(conversationWorkerCount)
|
||||||
|
|
||||||
for _, conversationID := range conversationIDs {
|
for _, conversationID := range conversationIDs {
|
||||||
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
|
conversationID := conversationID
|
||||||
if err != nil {
|
g.Go(func() error {
|
||||||
if errs.ErrRecordNotFound.Is(err) {
|
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
|
||||||
continue
|
if err != nil {
|
||||||
|
if errs.ErrRecordNotFound.Is(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil, err
|
conversationsChan <- conversation
|
||||||
}
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := g.Wait(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
close(conversationsChan)
|
||||||
|
for conversation := range conversationsChan {
|
||||||
conversations = append(conversations, conversation)
|
conversations = append(conversations, conversation)
|
||||||
}
|
}
|
||||||
return conversations, nil
|
return conversations, nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user