mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-27 20:30:40 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
f985689a39
@ -1,4 +1,4 @@
|
|||||||
package rpc_server
|
package rpcserver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/internal/common/network"
|
"Open_IM/internal/common/network"
|
||||||
@ -31,12 +31,11 @@ func NewRpcServer(registerIPInConfig string, port int, registerName string, zkSe
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = zkClient.Register(s.RegisterName, registerIP, s.Port)
|
s.RegisterCenter = zkClient
|
||||||
|
err = s.RegisterCenter.Register(s.RegisterName, registerIP, s.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.RegisterCenter = zkClient
|
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
@ -20,7 +20,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewRpcAuthServer(port int) *rpcAuth {
|
func NewRpcAuthServer(port int) *rpcAuth {
|
||||||
r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -35,7 +35,7 @@ func NewRpcAuthServer(port int) *rpcAuth {
|
|||||||
func (s *rpcAuth) Run() {
|
func (s *rpcAuth) Run() {
|
||||||
operationID := utils.OperationIDGenerator()
|
operationID := utils.OperationIDGenerator()
|
||||||
log.NewInfo(operationID, "rpc auth start...")
|
log.NewInfo(operationID, "rpc auth start...")
|
||||||
listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port)
|
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -141,6 +141,6 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in
|
|||||||
}
|
}
|
||||||
|
|
||||||
type rpcAuth struct {
|
type rpcAuth struct {
|
||||||
*rpc_server.RpcServer
|
*rpcserver.RpcServer
|
||||||
controller.AuthInterface
|
controller.AuthInterface
|
||||||
}
|
}
|
||||||
|
@ -23,14 +23,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type friendServer struct {
|
type friendServer struct {
|
||||||
*rpc_server.RpcServer
|
*rpcserver.RpcServer
|
||||||
|
|
||||||
controller.FriendInterface
|
controller.FriendInterface
|
||||||
controller.BlackInterface
|
controller.BlackInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFriendServer(port int) *friendServer {
|
func NewFriendServer(port int) *friendServer {
|
||||||
r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -64,7 +64,7 @@ func NewFriendServer(port int) *friendServer {
|
|||||||
func (s *friendServer) Run() {
|
func (s *friendServer) Run() {
|
||||||
operationID := utils.OperationIDGenerator()
|
operationID := utils.OperationIDGenerator()
|
||||||
log.NewInfo(operationID, "friendServer run...")
|
log.NewInfo(operationID, "friendServer run...")
|
||||||
listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port)
|
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"math/big"
|
"math/big"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -45,6 +46,24 @@ func GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sd
|
|||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetUsername(ctx context.Context, userIDs []string) (map[string]string, error) {
|
||||||
|
if len(userIDs) == 0 {
|
||||||
|
return map[string]string{}, nil
|
||||||
|
}
|
||||||
|
users, err := GetPublicUserInfo(ctx, userIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if ids := utils.Single(userIDs, utils.Slice(users, func(e *sdkws.PublicUserInfo) string {
|
||||||
|
return e.UserID
|
||||||
|
})); len(ids) > 0 {
|
||||||
|
return nil, constant.ErrUserIDNotFound.Wrap(strings.Join(ids, ","))
|
||||||
|
}
|
||||||
|
return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, string) {
|
||||||
|
return e.UserID, e.Nickname
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
func GroupNotification(ctx context.Context, groupID string) {
|
func GroupNotification(ctx context.Context, groupID string) {
|
||||||
var conversationReq pbConversation.ModifyConversationFieldReq
|
var conversationReq pbConversation.ModifyConversationFieldReq
|
||||||
conversation := pbConversation.Conversation{
|
conversation := pbConversation.Conversation{
|
||||||
|
@ -159,6 +159,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
|||||||
group.GroupID = genGroupID(ctx, req.GroupInfo.GroupID)
|
group.GroupID = genGroupID(ctx, req.GroupInfo.GroupID)
|
||||||
joinGroup := func(userID string, roleLevel int32) error {
|
joinGroup := func(userID string, roleLevel int32) error {
|
||||||
groupMember := PbToDbGroupMember(userMap[userID])
|
groupMember := PbToDbGroupMember(userMap[userID])
|
||||||
|
groupMember.Nickname = ""
|
||||||
groupMember.GroupID = group.GroupID
|
groupMember.GroupID = group.GroupID
|
||||||
groupMember.RoleLevel = roleLevel
|
groupMember.RoleLevel = roleLevel
|
||||||
groupMember.OperatorUserID = tracelog.GetOpUserID(ctx)
|
groupMember.OperatorUserID = tracelog.GetOpUserID(ctx)
|
||||||
@ -240,7 +241,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
|
|||||||
resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationTb.GroupModel) string {
|
resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationTb.GroupModel) string {
|
||||||
return group.GroupID
|
return group.GroupID
|
||||||
}), func(group *relationTb.GroupModel) *open_im_sdk.GroupInfo {
|
}), func(group *relationTb.GroupModel) *open_im_sdk.GroupInfo {
|
||||||
return DbToPbGroupInfo(group, ownerMap[group.GroupID].UserID, uint32(groupMemberNum[group.GroupID]))
|
return DbToPbGroupInfo(group, ownerMap[group.GroupID].UserID, groupMemberNum[group.GroupID])
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@ -321,6 +322,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
|||||||
var groupMembers []*relationTb.GroupMemberModel
|
var groupMembers []*relationTb.GroupMemberModel
|
||||||
for _, userID := range req.InvitedUserIDs {
|
for _, userID := range req.InvitedUserIDs {
|
||||||
member := PbToDbGroupMember(userMap[userID])
|
member := PbToDbGroupMember(userMap[userID])
|
||||||
|
member.Nickname = ""
|
||||||
member.GroupID = req.GroupID
|
member.GroupID = req.GroupID
|
||||||
member.RoleLevel = constant.GroupOrdinaryUsers
|
member.RoleLevel = constant.GroupOrdinaryUsers
|
||||||
member.OperatorUserID = opUserID
|
member.OperatorUserID = opUserID
|
||||||
@ -352,7 +354,16 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) {
|
||||||
|
return e.UserID, e.Nickname == ""
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
||||||
|
if e.Nickname == "" {
|
||||||
|
e.Nickname = nameMap[e.UserID]
|
||||||
|
}
|
||||||
return DbToPbGroupMembersCMSResp(e)
|
return DbToPbGroupMembersCMSResp(e)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -365,7 +376,16 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.Total = total
|
resp.Total = total
|
||||||
|
nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) {
|
||||||
|
return e.UserID, e.Nickname == ""
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
||||||
|
if e.Nickname == "" {
|
||||||
|
e.Nickname = nameMap[e.UserID]
|
||||||
|
}
|
||||||
return DbToPbGroupMembersCMSResp(e)
|
return DbToPbGroupMembersCMSResp(e)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -450,7 +470,16 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) {
|
||||||
|
return e.UserID, e.Nickname == ""
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
||||||
|
if e.Nickname == "" {
|
||||||
|
e.Nickname = nameMap[e.UserID]
|
||||||
|
}
|
||||||
return DbToPbGroupMembersCMSResp(e)
|
return DbToPbGroupMembersCMSResp(e)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -801,7 +830,16 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGr
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.Total = total
|
resp.Total = total
|
||||||
|
nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) {
|
||||||
|
return e.UserID, e.Nickname == ""
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
||||||
|
if e.Nickname == "" {
|
||||||
|
e.Nickname = nameMap[e.UserID]
|
||||||
|
}
|
||||||
return DbToPbGroupMembersCMSResp(e)
|
return DbToPbGroupMembersCMSResp(e)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -1011,8 +1049,8 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = s.GroupInterface.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *controller.BatchUpdateGroupMember {
|
err = s.GroupInterface.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbGroup.SetGroupMemberInfo) *relationTb.BatchUpdateGroupMember {
|
||||||
return &controller.BatchUpdateGroupMember{
|
return &relationTb.BatchUpdateGroupMember{
|
||||||
GroupID: e.GroupID,
|
GroupID: e.GroupID,
|
||||||
UserID: e.UserID,
|
UserID: e.UserID,
|
||||||
Map: UpdateGroupMemberMap(e),
|
Map: UpdateGroupMemberMap(e),
|
||||||
@ -1053,7 +1091,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get
|
|||||||
}
|
}
|
||||||
resp.GroupAbstractInfos = utils.Slice(groups, func(group *relationTb.GroupModel) *pbGroup.GroupAbstractInfo {
|
resp.GroupAbstractInfos = utils.Slice(groups, func(group *relationTb.GroupModel) *pbGroup.GroupAbstractInfo {
|
||||||
users := groupUserMap[group.GroupID]
|
users := groupUserMap[group.GroupID]
|
||||||
return DbToPbGroupAbstractInfo(group.GroupID, uint32(len(users.UserIDs)), users.Hash)
|
return DbToPbGroupAbstractInfo(group.GroupID, users.MemberNum, users.Hash)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@ -1067,7 +1105,16 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
nameMap, err := GetUsername(ctx, utils.Filter(members, func(e *relationTb.GroupMemberModel) (string, bool) {
|
||||||
|
return e.UserID, e.Nickname == ""
|
||||||
|
}))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo {
|
||||||
|
if e.Nickname == "" {
|
||||||
|
e.Nickname = nameMap[e.UserID]
|
||||||
|
}
|
||||||
return DbToPbGroupMembersCMSResp(e)
|
return DbToPbGroupMembersCMSResp(e)
|
||||||
})
|
})
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -1075,10 +1122,10 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge
|
|||||||
|
|
||||||
func (s *groupServer) GetGroupMemberUserID(ctx context.Context, req *pbGroup.GetGroupMemberUserIDReq) (*pbGroup.GetGroupMemberUserIDResp, error) {
|
func (s *groupServer) GetGroupMemberUserID(ctx context.Context, req *pbGroup.GetGroupMemberUserIDReq) (*pbGroup.GetGroupMemberUserIDResp, error) {
|
||||||
resp := &pbGroup.GetGroupMemberUserIDResp{}
|
resp := &pbGroup.GetGroupMemberUserIDResp{}
|
||||||
userIDs, err := s.GroupInterface.FindGroupMemberUserID(ctx, req.GroupID)
|
var err error
|
||||||
|
resp.UserIDs, err = s.GroupInterface.FindGroupMemberUserID(ctx, req.GroupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp.UserIDs = userIDs
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
@ -25,12 +25,12 @@ import (
|
|||||||
type userServer struct {
|
type userServer struct {
|
||||||
rpcPort int
|
rpcPort int
|
||||||
rpcRegisterName string
|
rpcRegisterName string
|
||||||
*rpc_server.RpcServer
|
*rpcserver.RpcServer
|
||||||
controller.UserInterface
|
controller.UserInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUserServer(port int) *userServer {
|
func NewUserServer(port int) *userServer {
|
||||||
r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
r, err := rpcserver.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -52,7 +52,7 @@ func NewUserServer(port int) *userServer {
|
|||||||
func (s *userServer) Run() {
|
func (s *userServer) Run() {
|
||||||
operationID := utils.OperationIDGenerator()
|
operationID := utils.OperationIDGenerator()
|
||||||
log.NewInfo(operationID, "rpc user start...")
|
log.NewInfo(operationID, "rpc user start...")
|
||||||
listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port)
|
listener, address, err := rpcserver.GetTcpListen(config.Config.ListenIP, s.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
5
pkg/common/db/cache/extend_msg_set.go
vendored
5
pkg/common/db/cache/extend_msg_set.go
vendored
@ -9,6 +9,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
extendMsgSetCache = "EXTEND_MSG_SET_CACHE:"
|
||||||
|
extendMsgCache = "EXTEND_MSG_CACHE:"
|
||||||
|
)
|
||||||
|
|
||||||
type ExtendMsgSetCache struct {
|
type ExtendMsgSetCache struct {
|
||||||
expireTime time.Duration
|
expireTime time.Duration
|
||||||
rcClient *rockscache.Client
|
rcClient *rockscache.Client
|
||||||
|
111
pkg/common/db/cache/group.go
vendored
111
pkg/common/db/cache/group.go
vendored
@ -1,7 +1,6 @@
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
"Open_IM/pkg/common/db/relation"
|
"Open_IM/pkg/common/db/relation"
|
||||||
relationTb "Open_IM/pkg/common/db/table/relation"
|
relationTb "Open_IM/pkg/common/db/table/relation"
|
||||||
"Open_IM/pkg/common/db/unrelation"
|
"Open_IM/pkg/common/db/unrelation"
|
||||||
@ -12,8 +11,8 @@ import (
|
|||||||
"github.com/dtm-labs/rockscache"
|
"github.com/dtm-labs/rockscache"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"math/big"
|
"math/big"
|
||||||
"sort"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -117,22 +116,6 @@ func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (gro
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
|
||||||
defer func() {
|
|
||||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
|
||||||
}()
|
|
||||||
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
|
||||||
for _, groupID := range groupIDs {
|
|
||||||
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// userJoinSuperGroup
|
// userJoinSuperGroup
|
||||||
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
@ -160,43 +143,18 @@ func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID str
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
//// groupMembersHash
|
|
||||||
//func (g *GroupCacheRedis) GetGroupsMembersHash(ctx context.Context, groupIDs []string) (map[string]uint64, error) {
|
|
||||||
// return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, "")
|
|
||||||
//}
|
|
||||||
|
|
||||||
// groupMembersHash
|
// groupMembersHash
|
||||||
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
|
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
|
||||||
generateHash := func() (string, error) {
|
return GetCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
|
||||||
groupInfo, err := g.GetGroupInfo(ctx, groupID)
|
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return 0, err
|
||||||
}
|
|
||||||
if groupInfo.Status == constant.GroupStatusDismissed {
|
|
||||||
return "0", nil
|
|
||||||
}
|
|
||||||
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
sort.Strings(groupMemberIDList)
|
|
||||||
var all string
|
|
||||||
for _, v := range groupMemberIDList {
|
|
||||||
all += v
|
|
||||||
}
|
}
|
||||||
|
utils.Sort(userIDs, true)
|
||||||
bi := big.NewInt(0)
|
bi := big.NewInt(0)
|
||||||
bi.SetString(utils.Md5(all)[0:8], 16)
|
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
|
||||||
return strconv.Itoa(int(bi.Uint64())), nil
|
return bi.Uint64(), nil
|
||||||
}
|
})
|
||||||
defer func() {
|
|
||||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64)
|
|
||||||
}()
|
|
||||||
hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash)
|
|
||||||
if err != nil {
|
|
||||||
return 0, utils.Wrap(err, "fetch failed")
|
|
||||||
}
|
|
||||||
hashCode, err := strconv.Atoi(hashCodeStr)
|
|
||||||
return uint64(hashCode), err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
|
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
|
||||||
@ -207,41 +165,10 @@ func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
// groupMemberIDs
|
// groupMemberIDs
|
||||||
// from redis
|
|
||||||
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
|
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
|
||||||
f := func() (string, error) {
|
return GetCache(ctx, g.rcClient, g.getGroupMemberIDsKey(groupID), g.expireTime, func(ctx context.Context) ([]string, error) {
|
||||||
groupInfo, err := g.GetGroupInfo(ctx, groupID)
|
return g.groupMember.FindMemberUserID(ctx, groupID)
|
||||||
if err != nil {
|
})
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
var groupMemberIDList []string
|
|
||||||
if groupInfo.GroupType == constant.SuperGroup {
|
|
||||||
superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
groupMemberIDList = superGroup.MemberIDList
|
|
||||||
} else {
|
|
||||||
groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bytes, err := json.Marshal(groupMemberIDList)
|
|
||||||
if err != nil {
|
|
||||||
return "", utils.Wrap(err, "")
|
|
||||||
}
|
|
||||||
return string(bytes), nil
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs)
|
|
||||||
}()
|
|
||||||
groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs)
|
|
||||||
return groupMemberIDs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
|
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
|
||||||
@ -389,3 +316,19 @@ func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string)
|
|||||||
}()
|
}()
|
||||||
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
|
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
||||||
|
defer func() {
|
||||||
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
||||||
|
}()
|
||||||
|
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
||||||
|
for _, groupID := range groupIDs {
|
||||||
|
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
72
pkg/common/db/cache/rockscache.go
vendored
72
pkg/common/db/cache/rockscache.go
vendored
@ -1,42 +1,15 @@
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"Open_IM/pkg/common/constant"
|
|
||||||
"Open_IM/pkg/common/db/relation"
|
|
||||||
"Open_IM/pkg/common/log"
|
|
||||||
"Open_IM/pkg/common/tracelog"
|
|
||||||
"Open_IM/pkg/utils"
|
"Open_IM/pkg/utils"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"math/big"
|
"github.com/dtm-labs/rockscache"
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
//userInfoCache = "USER_INFO_CACHE:"
|
|
||||||
//friendRelationCache = "FRIEND_RELATION_CACHE:"
|
|
||||||
blackListCache = "BLACK_LIST_CACHE:"
|
|
||||||
//groupCache = "GROUP_CACHE:"
|
|
||||||
//groupInfoCache = "GROUP_INFO_CACHE:"
|
|
||||||
//groupOwnerIDCache = "GROUP_OWNER_ID:"
|
|
||||||
//joinedGroupListCache = "JOINED_GROUP_LIST_CACHE:"
|
|
||||||
//groupMemberInfoCache = "GROUP_MEMBER_INFO_CACHE:"
|
|
||||||
//groupAllMemberInfoCache = "GROUP_ALL_MEMBER_INFO_CACHE:"
|
|
||||||
//allFriendInfoCache = "ALL_FRIEND_INFO_CACHE:"
|
|
||||||
//joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:"
|
|
||||||
//groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:"
|
|
||||||
//groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:"
|
|
||||||
conversationCache = "CONVERSATION_CACHE:"
|
|
||||||
conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:"
|
|
||||||
|
|
||||||
extendMsgSetCache = "EXTEND_MSG_SET_CACHE:"
|
|
||||||
extendMsgCache = "EXTEND_MSG_CACHE:"
|
|
||||||
)
|
|
||||||
|
|
||||||
const scanCount = 3000
|
const scanCount = 3000
|
||||||
const RandomExpireAdjustment = 0.2
|
|
||||||
|
|
||||||
func (rc *RcClient) DelKeys() {
|
func (rc *RcClient) DelKeys() {
|
||||||
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache,
|
for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE", groupOwnerIDCache, joinedGroupListCache,
|
||||||
@ -68,3 +41,44 @@ func (rc *RcClient) DelKeys() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
|
||||||
|
var t T
|
||||||
|
var write bool
|
||||||
|
v, err := rcClient.Fetch(key, expire, func() (s string, err error) {
|
||||||
|
t, err = fn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
bs, err := json.Marshal(t)
|
||||||
|
if err != nil {
|
||||||
|
return "", utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
write = true
|
||||||
|
return string(bs), nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return t, err
|
||||||
|
}
|
||||||
|
if write {
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
err = json.Unmarshal([]byte(v), &t)
|
||||||
|
if err != nil {
|
||||||
|
return t, utils.Wrap(err, "")
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) {
|
||||||
|
rs := make([]T, 0, len(list))
|
||||||
|
for _, e := range list {
|
||||||
|
r, err := fn(ctx, e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rs = append(rs, r)
|
||||||
|
}
|
||||||
|
return rs, nil
|
||||||
|
}
|
||||||
|
49
pkg/common/db/cache/utils.go
vendored
49
pkg/common/db/cache/utils.go
vendored
@ -1,49 +0,0 @@
|
|||||||
package cache
|
|
||||||
|
|
||||||
import (
|
|
||||||
"Open_IM/pkg/utils"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"github.com/dtm-labs/rockscache"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
|
|
||||||
var t T
|
|
||||||
var write bool
|
|
||||||
v, err := rcClient.Fetch(key, expire, func() (s string, err error) {
|
|
||||||
t, err = fn(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
bs, err := json.Marshal(t)
|
|
||||||
if err != nil {
|
|
||||||
return "", utils.Wrap(err, "")
|
|
||||||
}
|
|
||||||
write = true
|
|
||||||
return string(bs), nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return t, err
|
|
||||||
}
|
|
||||||
if write {
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
err = json.Unmarshal([]byte(v), &t)
|
|
||||||
if err != nil {
|
|
||||||
return t, utils.Wrap(err, "")
|
|
||||||
}
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetCacheFor[E any, T any](ctx context.Context, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) {
|
|
||||||
rs := make([]T, 0, len(list))
|
|
||||||
for _, e := range list {
|
|
||||||
r, err := fn(ctx, e)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
rs = append(rs, r)
|
|
||||||
}
|
|
||||||
return rs, nil
|
|
||||||
}
|
|
@ -21,17 +21,6 @@ import (
|
|||||||
|
|
||||||
//type GroupInterface GroupDataBaseInterface
|
//type GroupInterface GroupDataBaseInterface
|
||||||
|
|
||||||
type BatchUpdateGroupMember struct {
|
|
||||||
GroupID string
|
|
||||||
UserID string
|
|
||||||
Map map[string]any
|
|
||||||
}
|
|
||||||
|
|
||||||
type GroupSimpleUserID struct {
|
|
||||||
Hash uint64
|
|
||||||
UserIDs []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type GroupInterface interface {
|
type GroupInterface interface {
|
||||||
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
|
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
|
||||||
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
|
||||||
@ -48,11 +37,11 @@ type GroupInterface interface {
|
|||||||
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
||||||
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
|
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
|
||||||
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||||
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error)
|
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
|
||||||
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
|
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
|
||||||
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
|
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
|
||||||
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
|
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
|
||||||
UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error
|
UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error
|
||||||
// GroupRequest
|
// GroupRequest
|
||||||
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
|
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
|
||||||
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
||||||
@ -132,7 +121,7 @@ func (g *GroupController) DeleteGroupMember(ctx context.Context, groupID string,
|
|||||||
return g.database.DeleteGroupMember(ctx, groupID, userIDs)
|
return g.database.DeleteGroupMember(ctx, groupID, userIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupController) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) {
|
func (g *GroupController) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||||
return g.database.MapGroupMemberUserID(ctx, groupIDs)
|
return g.database.MapGroupMemberUserID(ctx, groupIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,7 +133,7 @@ func (g *GroupController) TransferGroupOwner(ctx context.Context, groupID string
|
|||||||
return g.database.TransferGroupOwner(ctx, groupID, oldOwnerUserID, newOwnerUserID, roleLevel)
|
return g.database.TransferGroupOwner(ctx, groupID, oldOwnerUserID, newOwnerUserID, roleLevel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupController) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error {
|
func (g *GroupController) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
|
||||||
return g.database.UpdateGroupMembers(ctx, data)
|
return g.database.UpdateGroupMembers(ctx, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,11 +193,11 @@ type GroupDataBaseInterface interface {
|
|||||||
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
SearchGroupMember(ctx context.Context, keyword string, groupIDs []string, userIDs []string, roleLevels []int32, pageNumber, showNumber int32) (uint32, []*relationTb.GroupMemberModel, error)
|
||||||
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
|
HandlerGroupRequest(ctx context.Context, groupID string, userID string, handledMsg string, handleResult int32, member *relationTb.GroupMemberModel) error
|
||||||
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error
|
||||||
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error)
|
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error)
|
||||||
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
|
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
|
||||||
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
|
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
|
||||||
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
|
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
|
||||||
UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error
|
UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error
|
||||||
// GroupRequest
|
// GroupRequest
|
||||||
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
|
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
|
||||||
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
|
||||||
@ -393,20 +382,20 @@ func (g *GroupDataBase) DeleteGroupMember(ctx context.Context, groupID string, u
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*GroupSimpleUserID, error) {
|
func (g *GroupDataBase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*relationTb.GroupSimpleUserID, error) {
|
||||||
mapGroupUserIDs, err := g.groupMemberDB.FindJoinUserID(ctx, groupIDs)
|
mapGroupUserIDs, err := g.groupMemberDB.FindJoinUserID(ctx, groupIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res := make(map[string]*GroupSimpleUserID)
|
res := make(map[string]*relationTb.GroupSimpleUserID)
|
||||||
for _, groupID := range groupIDs {
|
for _, groupID := range groupIDs {
|
||||||
users := &GroupSimpleUserID{
|
userIDs := mapGroupUserIDs[groupID]
|
||||||
UserIDs: mapGroupUserIDs[groupID],
|
users := &relationTb.GroupSimpleUserID{}
|
||||||
}
|
if len(userIDs) > 0 {
|
||||||
if len(users.UserIDs) > 0 {
|
utils.Sort(userIDs, true)
|
||||||
utils.Sort(users.UserIDs, true)
|
|
||||||
bi := big.NewInt(0)
|
bi := big.NewInt(0)
|
||||||
bi.SetString(utils.Md5(strings.Join(users.UserIDs, ";"))[0:8], 16)
|
bi.SetString(utils.Md5(strings.Join(userIDs, ";"))[0:8], 16)
|
||||||
|
users.Hash = bi.Uint64()
|
||||||
}
|
}
|
||||||
res[groupID] = users
|
res[groupID] = users
|
||||||
}
|
}
|
||||||
@ -452,7 +441,7 @@ func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID string, u
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error {
|
func (g *GroupDataBase) UpdateGroupMembers(ctx context.Context, data []*relationTb.BatchUpdateGroupMember) error {
|
||||||
return g.db.Transaction(func(tx *gorm.DB) error {
|
return g.db.Transaction(func(tx *gorm.DB) error {
|
||||||
for _, item := range data {
|
for _, item := range data {
|
||||||
if err := g.groupMemberDB.Update(ctx, item.GroupID, item.UserID, item.Map, tx); err != nil {
|
if err := g.groupMemberDB.Update(ctx, item.GroupID, item.UserID, item.Map, tx); err != nil {
|
||||||
@ -487,25 +476,25 @@ func (g *GroupDataBase) FindJoinSuperGroup(ctx context.Context, userID string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error {
|
func (g *GroupDataBase) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDList []string) error {
|
||||||
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
|
return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error {
|
||||||
return s.CreateSuperGroup(ctx, groupID, initMemberIDList, tx)
|
return g.mongoDB.CreateSuperGroup(ctx, groupID, initMemberIDList, tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) DeleteSuperGroup(ctx context.Context, groupID string) error {
|
func (g *GroupDataBase) DeleteSuperGroup(ctx context.Context, groupID string) error {
|
||||||
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
|
return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error {
|
||||||
return s.DeleteSuperGroup(ctx, groupID, tx)
|
return g.mongoDB.DeleteSuperGroup(ctx, groupID, tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
|
func (g *GroupDataBase) DeleteSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
|
||||||
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
|
return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error {
|
||||||
return s.RemoverUserFromSuperGroup(ctx, groupID, userIDs, tx)
|
return g.mongoDB.RemoverUserFromSuperGroup(ctx, groupID, userIDs, tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GroupDataBase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
|
func (g *GroupDataBase) CreateSuperGroupMember(ctx context.Context, groupID string, userIDs []string) error {
|
||||||
return g.mongoDB.Transaction(ctx, func(s unrelationTb.SuperGroupModelInterface, tx any) error {
|
return unrelation.MongoTransaction(ctx, g.mongoDB.MgoClient, func(tx mongo.SessionContext) error {
|
||||||
return s.AddUserToSuperGroup(ctx, groupID, userIDs, tx)
|
return g.mongoDB.AddUserToSuperGroup(ctx, groupID, userIDs, tx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
21
pkg/common/db/controller/msg.go
Normal file
21
pkg/common/db/controller/msg.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
pbMsg "Open_IM/pkg/proto/msg"
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MsgInterface interface {
|
||||||
|
BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||||
|
BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64)
|
||||||
|
DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error)
|
||||||
|
DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error
|
||||||
|
DelMsgBySeqListInOneDoc(ctx context.Context, docID string, seqList []uint32) (unExistSeqList []uint32, err error)
|
||||||
|
ReplaceMsgToBlankByIndex(suffixID string, index int) (replaceMaxSeq uint32, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type MsgDatabaseInterface interface {
|
||||||
|
BatchInsertChat2DB(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq uint64) error
|
||||||
|
BatchInsertChat2Cache(ctx context.Context, insertID string, msgList []*pbMsg.MsgDataToMQ) (error, uint64)
|
||||||
|
DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error)
|
||||||
|
}
|
12
pkg/common/db/table/relation/utils.go
Normal file
12
pkg/common/db/table/relation/utils.go
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
package relation
|
||||||
|
|
||||||
|
type BatchUpdateGroupMember struct {
|
||||||
|
GroupID string
|
||||||
|
UserID string
|
||||||
|
Map map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
type GroupSimpleUserID struct {
|
||||||
|
Hash uint64
|
||||||
|
MemberNum uint32
|
||||||
|
}
|
@ -7,9 +7,9 @@ const (
|
|||||||
CChat = "msg"
|
CChat = "msg"
|
||||||
)
|
)
|
||||||
|
|
||||||
type UserChatModel struct {
|
type UserMsgDocModel struct {
|
||||||
UID string `bson:"uid"`
|
DocID string `bson:"uid"`
|
||||||
Msg []MsgInfoModel `bson:"msg"`
|
Msg []MsgInfoModel `bson:"msg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MsgInfoModel struct {
|
type MsgInfoModel struct {
|
||||||
@ -17,20 +17,20 @@ type MsgInfoModel struct {
|
|||||||
Msg []byte `bson:"msg"`
|
Msg []byte `bson:"msg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (UserChatModel) TableName() string {
|
func (UserMsgDocModel) TableName() string {
|
||||||
return CChat
|
return CChat
|
||||||
}
|
}
|
||||||
|
|
||||||
func (UserChatModel) GetSingleGocMsgNum() int {
|
func (UserMsgDocModel) GetSingleDocMsgNum() int {
|
||||||
return singleGocMsgNum
|
return singleGocMsgNum
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u UserChatModel) getSeqUid(uid string, seq uint32) string {
|
func (u UserMsgDocModel) getSeqUid(uid string, seq uint32) string {
|
||||||
seqSuffix := seq / singleGocMsgNum
|
seqSuffix := seq / singleGocMsgNum
|
||||||
return u.indexGen(uid, seqSuffix)
|
return u.indexGen(uid, seqSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string {
|
func (u UserMsgDocModel) getSeqUserIDList(userID string, maxSeq uint32) []string {
|
||||||
seqMaxSuffix := maxSeq / singleGocMsgNum
|
seqMaxSuffix := maxSeq / singleGocMsgNum
|
||||||
var seqUserIDList []string
|
var seqUserIDList []string
|
||||||
for i := 0; i <= int(seqMaxSuffix); i++ {
|
for i := 0; i <= int(seqMaxSuffix); i++ {
|
||||||
@ -40,16 +40,16 @@ func (u UserChatModel) getSeqUserIDList(userID string, maxSeq uint32) []string {
|
|||||||
return seqUserIDList
|
return seqUserIDList
|
||||||
}
|
}
|
||||||
|
|
||||||
func (UserChatModel) getSeqSuperGroupID(groupID string, seq uint32) string {
|
func (UserMsgDocModel) getSeqSuperGroupID(groupID string, seq uint32) string {
|
||||||
seqSuffix := seq / singleGocMsgNum
|
seqSuffix := seq / singleGocMsgNum
|
||||||
return superGroupIndexGen(groupID, seqSuffix)
|
return superGroupIndexGen(groupID, seqSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u UserChatModel) GetSeqUid(uid string, seq uint32) string {
|
func (u UserMsgDocModel) GetSeqUid(uid string, seq uint32) string {
|
||||||
return u.getSeqUid(uid, seq)
|
return u.getSeqUid(uid, seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (UserChatModel) getMsgIndex(seq uint32) int {
|
func (UserMsgDocModel) getMsgIndex(seq uint32) int {
|
||||||
seqSuffix := seq / singleGocMsgNum
|
seqSuffix := seq / singleGocMsgNum
|
||||||
var index uint32
|
var index uint32
|
||||||
if seqSuffix == 0 {
|
if seqSuffix == 0 {
|
||||||
@ -60,6 +60,6 @@ func (UserChatModel) getMsgIndex(seq uint32) int {
|
|||||||
return int(index)
|
return int(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (UserChatModel) indexGen(uid string, seqSuffix uint32) string {
|
func (UserMsgDocModel) indexGen(uid string, seqSuffix uint32) string {
|
||||||
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
|
||||||
}
|
}
|
||||||
|
@ -169,117 +169,3 @@ func (d *db.DataBases) BatchInsertChat2Cache(insertID string, msgList []*pbMsg.M
|
|||||||
}
|
}
|
||||||
return utils.Wrap(err, ""), lastMaxSeq
|
return utils.Wrap(err, ""), lastMaxSeq
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (d *DataBases) BatchInsertChatBoth(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) (error, uint64) {
|
|
||||||
// err, lastMaxSeq := d.BatchInsertChat2Cache(userID, msgList, operationID)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error(operationID, "BatchInsertChat2Cache failed ", err.Error(), userID, len(msgList))
|
|
||||||
// return err, 0
|
|
||||||
// }
|
|
||||||
// for {
|
|
||||||
// if runtime.NumGoroutine() > 50000 {
|
|
||||||
// log.NewWarn(operationID, "too many NumGoroutine ", runtime.NumGoroutine())
|
|
||||||
// time.Sleep(10 * time.Millisecond)
|
|
||||||
// } else {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// return nil, lastMaxSeq
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, operationID string) error {
|
|
||||||
// newTime := getCurrentTimestampByMill()
|
|
||||||
// if len(msgList) > GetSingleGocMsgNum() {
|
|
||||||
// return errors.New("too large")
|
|
||||||
// }
|
|
||||||
// isInit := false
|
|
||||||
// currentMaxSeq, err := d.GetUserMaxSeq(userID)
|
|
||||||
// if err == nil {
|
|
||||||
//
|
|
||||||
// } else if err == go_redis.Nil {
|
|
||||||
// isInit = true
|
|
||||||
// currentMaxSeq = 0
|
|
||||||
// } else {
|
|
||||||
// return utils.Wrap(err, "")
|
|
||||||
// }
|
|
||||||
// var remain uint64
|
|
||||||
// //if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
|
||||||
// // remain = uint64(GetSingleGocMsgNum()-1) - (currentMaxSeq % uint64(GetSingleGocMsgNum()))
|
|
||||||
// //} else {
|
|
||||||
// // remain = uint64(GetSingleGocMsgNum()) - ((currentMaxSeq - (uint64(GetSingleGocMsgNum()) - 1)) % uint64(GetSingleGocMsgNum()))
|
|
||||||
// //}
|
|
||||||
//
|
|
||||||
// blk0 := uint64(GetSingleGocMsgNum() - 1)
|
|
||||||
// if currentMaxSeq < uint64(GetSingleGocMsgNum()) {
|
|
||||||
// remain = blk0 - currentMaxSeq
|
|
||||||
// } else {
|
|
||||||
// excludeBlk0 := currentMaxSeq - blk0
|
|
||||||
// remain = (uint64(GetSingleGocMsgNum()) - (excludeBlk0 % uint64(GetSingleGocMsgNum()))) % uint64(GetSingleGocMsgNum())
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// insertCounter := uint64(0)
|
|
||||||
// msgListToMongo := make([]MsgInfo, 0)
|
|
||||||
// msgListToMongoNext := make([]MsgInfo, 0)
|
|
||||||
// seqUid := ""
|
|
||||||
// seqUidNext := ""
|
|
||||||
// log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
|
|
||||||
// //4998 remain ==1
|
|
||||||
// //4999
|
|
||||||
// for _, m := range msgList {
|
|
||||||
// log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
|
|
||||||
// currentMaxSeq++
|
|
||||||
// sMsg := MsgInfo{}
|
|
||||||
// sMsg.SendTime = m.MsgData.SendTime
|
|
||||||
// m.MsgData.Seq = uint32(currentMaxSeq)
|
|
||||||
// if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
|
|
||||||
// return utils.Wrap(err, "")
|
|
||||||
// }
|
|
||||||
// if isInit {
|
|
||||||
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
|
||||||
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
|
||||||
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// if insertCounter < remain {
|
|
||||||
// msgListToMongo = append(msgListToMongo, sMsg)
|
|
||||||
// insertCounter++
|
|
||||||
// seqUid = getSeqUid(userID, uint32(currentMaxSeq))
|
|
||||||
// log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
|
||||||
// } else {
|
|
||||||
// msgListToMongoNext = append(msgListToMongoNext, sMsg)
|
|
||||||
// seqUidNext = getSeqUid(userID, uint32(currentMaxSeq))
|
|
||||||
// log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// // ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
|
||||||
//
|
|
||||||
// ctx := context.Background()
|
|
||||||
// c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
|
|
||||||
//
|
|
||||||
// if seqUid != "" {
|
|
||||||
// filter := bson.M{"uid": seqUid}
|
|
||||||
// log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo)
|
|
||||||
// err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgListToMongo}}}).Err()
|
|
||||||
// if err != nil {
|
|
||||||
// log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
|
|
||||||
// return utils.Wrap(err, "")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// if seqUidNext != "" {
|
|
||||||
// filter := bson.M{"uid": seqUidNext}
|
|
||||||
// sChat := UserChat{}
|
|
||||||
// sChat.UID = seqUidNext
|
|
||||||
// sChat.Msg = msgListToMongoNext
|
|
||||||
// log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext)
|
|
||||||
// if _, err = c.InsertOne(ctx, &sChat); err != nil {
|
|
||||||
// log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
|
|
||||||
// return utils.Wrap(err, "")
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// log.NewWarn(operationID, "batch mgo cost time ", getCurrentTimestampByMill()-newTime, userID, len(msgList))
|
|
||||||
// return utils.Wrap(d.SetUserMaxSeq(userID, uint64(currentMaxSeq)), "")
|
|
||||||
//}
|
|
||||||
|
|
||||||
//func (d *DataBases)setMessageToCache(msgList []*pbMsg.MsgDataToMQ, uid string) (err error) {
|
|
||||||
//
|
|
||||||
//}
|
|
||||||
|
@ -74,8 +74,8 @@ func (m *Mongo) CreateTagIndex() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Mongo) CreateMsgIndex() {
|
func (m *Mongo) CreateMsgIndex() {
|
||||||
if err := m.createMongoIndex(cChat, false, "uid"); err != nil {
|
if err := m.createMongoIndex(unrelation.CChat, false, "uid"); err != nil {
|
||||||
fmt.Println(err.Error() + " index create failed " + cChat + " uid, please create index by yourself in field uid")
|
fmt.Println(err.Error() + " index create failed " + unrelation.CChat + " uid, please create index by yourself in field uid")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,3 +150,12 @@ func MongoTransaction(ctx context.Context, mgo *mongo.Client, fn func(ctx mongo.
|
|||||||
}
|
}
|
||||||
return utils.Wrap(sess.CommitTransaction(sCtx), "")
|
return utils.Wrap(sess.CommitTransaction(sCtx), "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getTxCtx(ctx context.Context, tx []any) context.Context {
|
||||||
|
if len(tx) > 0 {
|
||||||
|
if ctx, ok := tx[0].(mongo.SessionContext); ok {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
@ -27,6 +27,16 @@ func NewMsgMongoDriver(mgoDB *mongo.Database) *MsgMongoDriver {
|
|||||||
return &MsgMongoDriver{mgoDB: mgoDB, MsgCollection: mgoDB.Collection(unrelation.CChat)}
|
return &MsgMongoDriver{mgoDB: mgoDB, MsgCollection: mgoDB.Collection(unrelation.CChat)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MsgMongoDriver) FindOneAndUpdate(ctx context.Context, filter, update, output interface{}, opts ...*options.FindOneAndUpdateOptions) error {
|
||||||
|
return m.MsgCollection.FindOneAndUpdate(ctx, filter, update, opts...).Decode(output)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MsgMongoDriver) UpdateOne(ctx context.Context, filter, update interface{}, opts ...*options.UpdateOptions) error {
|
||||||
|
_, err := m.MsgCollection.UpdateOne(ctx, filter, update, opts...)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// database controller
|
||||||
func (m *MsgMongoDriver) DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) {
|
func (m *MsgMongoDriver) DelMsgBySeqList(ctx context.Context, userID string, seqList []uint32) (totalUnExistSeqList []uint32, err error) {
|
||||||
sortkeys.Uint32s(seqList)
|
sortkeys.Uint32s(seqList)
|
||||||
suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 {
|
suffixUserID2SubSeqList := func(uid string, seqList []uint32) map[string][]uint32 {
|
||||||
@ -73,6 +83,7 @@ func (m *MsgMongoDriver) DelMsgBySeqListInOneDoc(ctx context.Context, suffixUser
|
|||||||
return unexistSeqList, nil
|
return unexistSeqList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// database
|
||||||
func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error {
|
func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList []uint32) error {
|
||||||
sortkeys.Uint32s(seqList)
|
sortkeys.Uint32s(seqList)
|
||||||
seqMsgs, err := d.GetMsgBySeqListMongo2(ctx, uid, seqList)
|
seqMsgs, err := d.GetMsgBySeqListMongo2(ctx, uid, seqList)
|
||||||
@ -88,6 +99,7 @@ func (m *MsgMongoDriver) DelMsgLogic(ctx context.Context, uid string, seqList []
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// model
|
||||||
func (m *MsgMongoDriver) ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error {
|
func (m *MsgMongoDriver) ReplaceMsgByIndex(ctx context.Context, suffixUserID string, msg *sdkws.MsgData, seqIndex int) error {
|
||||||
log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
|
log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
|
||||||
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
|
||||||
|
@ -111,15 +111,6 @@ type SuperGroupMongoDriver struct {
|
|||||||
// panic("implement me")
|
// panic("implement me")
|
||||||
// }
|
// }
|
||||||
|
|
||||||
func (s *SuperGroupMongoDriver) getTxCtx(ctx context.Context, tx []any) context.Context {
|
|
||||||
if len(tx) > 0 {
|
|
||||||
if ctx, ok := tx[0].(mongo.SessionContext); ok {
|
|
||||||
return ctx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ctx
|
|
||||||
}
|
|
||||||
|
|
||||||
//func (s *SuperGroupMongoDriver) Transaction(ctx context.Context, fn func(ctx mongo.SessionContext) error) error {
|
//func (s *SuperGroupMongoDriver) Transaction(ctx context.Context, fn func(ctx mongo.SessionContext) error) error {
|
||||||
// sess, err := s.MgoClient.StartSession()
|
// sess, err := s.MgoClient.StartSession()
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
|
@ -144,11 +144,12 @@ func SliceSetAny[E any, K comparable](es []E, fn func(e E) K) map[K]struct{} {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func Filter[E any](es []E, fn func(e E) bool) []E {
|
func Filter[E, T any](es []E, fn func(e E) (T, bool)) []T {
|
||||||
rs := make([]E, 0, len(es))
|
rs := make([]T, 0, len(es))
|
||||||
for i := 0; i < len(es); i++ {
|
for i := 0; i < len(es); i++ {
|
||||||
if e := es[i]; fn(e) {
|
e := es[i]
|
||||||
rs = append(rs, e)
|
if t, ok := fn(e); ok {
|
||||||
|
rs = append(rs, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rs
|
return rs
|
||||||
|
Loading…
x
Reference in New Issue
Block a user