mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	Merge branch 'main' of github.com:openimsdk/open-im-server into fix/batch-insert-group-member
This commit is contained in:
		
						commit
						7e9f969a3c
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -12,7 +12,7 @@ require (
 | 
				
			|||||||
	github.com/gorilla/websocket v1.5.1
 | 
						github.com/gorilla/websocket v1.5.1
 | 
				
			||||||
	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
						github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
				
			||||||
	github.com/mitchellh/mapstructure v1.5.0
 | 
						github.com/mitchellh/mapstructure v1.5.0
 | 
				
			||||||
	github.com/openimsdk/protocol v0.0.69-alpha.32
 | 
						github.com/openimsdk/protocol v0.0.69-alpha.38
 | 
				
			||||||
	github.com/openimsdk/tools v0.0.49-alpha.51
 | 
						github.com/openimsdk/tools v0.0.49-alpha.51
 | 
				
			||||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
						github.com/pkg/errors v0.9.1 // indirect
 | 
				
			||||||
	github.com/prometheus/client_golang v1.18.0
 | 
						github.com/prometheus/client_golang v1.18.0
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
				
			|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
					github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
				
			||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
 | 
					github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
 | 
				
			||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
					github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.69-alpha.32 h1:uogVpyzas/YlrKvLsM/FVRwQSE4ELkaRct+//frOiyk=
 | 
					github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.69-alpha.32/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
					github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
 | 
				
			||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
					github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 | 
				
			||||||
 | 
				
			|||||||
@ -75,8 +75,8 @@ type Client struct {
 | 
				
			|||||||
	token          string
 | 
						token          string
 | 
				
			||||||
	hbCtx          context.Context
 | 
						hbCtx          context.Context
 | 
				
			||||||
	hbCancel       context.CancelFunc
 | 
						hbCancel       context.CancelFunc
 | 
				
			||||||
	subLock        sync.Mutex
 | 
						subLock        *sync.Mutex
 | 
				
			||||||
	subUserIDs     map[string]struct{}
 | 
						subUserIDs     map[string]struct{} // client conn subscription list
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ResetClient updates the client's state with new connection and context information.
 | 
					// ResetClient updates the client's state with new connection and context information.
 | 
				
			||||||
@ -94,14 +94,21 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
 | 
				
			|||||||
	c.closedErr = nil
 | 
						c.closedErr = nil
 | 
				
			||||||
	c.token = ctx.GetToken()
 | 
						c.token = ctx.GetToken()
 | 
				
			||||||
	c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
 | 
						c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
 | 
				
			||||||
 | 
						c.subLock = new(sync.Mutex)
 | 
				
			||||||
 | 
						if c.subUserIDs != nil {
 | 
				
			||||||
 | 
							clear(c.subUserIDs)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						c.subUserIDs = make(map[string]struct{})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) pingHandler(_ string) error {
 | 
					func (c *Client) pingHandler(appData string) error {
 | 
				
			||||||
	if err := c.conn.SetReadDeadline(pongWait); err != nil {
 | 
						if err := c.conn.SetReadDeadline(pongWait); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return c.writePongMsg()
 | 
						log.ZDebug(c.ctx, "ping Handler Success.", "appData", appData)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return c.writePongMsg(appData)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) pongHandler(_ string) error {
 | 
					func (c *Client) pongHandler(_ string) error {
 | 
				
			||||||
@ -156,7 +163,7 @@ func (c *Client) readMessage() {
 | 
				
			|||||||
			return
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case PingMessage:
 | 
							case PingMessage:
 | 
				
			||||||
			err := c.writePongMsg()
 | 
								err := c.writePongMsg("")
 | 
				
			||||||
			log.ZError(c.ctx, "writePongMsg", err)
 | 
								log.ZError(c.ctx, "writePongMsg", err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case CloseMessage:
 | 
							case CloseMessage:
 | 
				
			||||||
@ -244,13 +251,11 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req *Req) ([]byte,
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) close() {
 | 
					func (c *Client) close() {
 | 
				
			||||||
 | 
						c.w.Lock()
 | 
				
			||||||
 | 
						defer c.w.Unlock()
 | 
				
			||||||
	if c.closed.Load() {
 | 
						if c.closed.Load() {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	c.w.Lock()
 | 
					 | 
				
			||||||
	defer c.w.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	c.closed.Store(true)
 | 
						c.closed.Store(true)
 | 
				
			||||||
	c.conn.Close()
 | 
						c.conn.Close()
 | 
				
			||||||
	c.hbCancel() // Close server-initiated heartbeat.
 | 
						c.hbCancel() // Close server-initiated heartbeat.
 | 
				
			||||||
@ -311,6 +316,14 @@ func (c *Client) KickOnlineMessage() error {
 | 
				
			|||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *Client) PushUserOnlineStatus(data []byte) error {
 | 
				
			||||||
 | 
						resp := Resp{
 | 
				
			||||||
 | 
							ReqIdentifier: WsSubUserOnlineStatus,
 | 
				
			||||||
 | 
							Data:          data,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return c.writeBinaryMsg(resp)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) writeBinaryMsg(resp Resp) error {
 | 
					func (c *Client) writeBinaryMsg(resp Resp) error {
 | 
				
			||||||
	if c.closed.Load() {
 | 
						if c.closed.Load() {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
@ -378,7 +391,7 @@ func (c *Client) writePingMsg() error {
 | 
				
			|||||||
	return c.conn.WriteMessage(PingMessage, nil)
 | 
						return c.conn.WriteMessage(PingMessage, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) writePongMsg() error {
 | 
					func (c *Client) writePongMsg(appData string) error {
 | 
				
			||||||
	if c.closed.Load() {
 | 
						if c.closed.Load() {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@ -391,5 +404,5 @@ func (c *Client) writePongMsg() error {
 | 
				
			|||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return c.conn.WriteMessage(PongMessage, nil)
 | 
						return c.conn.WriteMessage(PongMessage, []byte(appData))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -2,15 +2,11 @@ package msggateway
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"encoding/json"
 | 
					 | 
				
			||||||
	"github.com/openimsdk/protocol/constant"
 | 
					 | 
				
			||||||
	"github.com/openimsdk/protocol/sdkws"
 | 
						"github.com/openimsdk/protocol/sdkws"
 | 
				
			||||||
	"github.com/openimsdk/tools/log"
 | 
						"github.com/openimsdk/tools/log"
 | 
				
			||||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
						"github.com/openimsdk/tools/utils/datautil"
 | 
				
			||||||
	"github.com/openimsdk/tools/utils/idutil"
 | 
					 | 
				
			||||||
	"google.golang.org/protobuf/proto"
 | 
						"google.golang.org/protobuf/proto"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
 | 
					func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userID string, platformIDs []int32) {
 | 
				
			||||||
@ -45,33 +41,19 @@ func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, dat
 | 
				
			|||||||
	return proto.Marshal(&resp)
 | 
						return proto.Marshal(&resp)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type subClient struct {
 | 
					 | 
				
			||||||
	clients map[string]*Client
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func newSubscription() *Subscription {
 | 
					func newSubscription() *Subscription {
 | 
				
			||||||
	return &Subscription{
 | 
						return &Subscription{
 | 
				
			||||||
		userIDs: make(map[string]*subClient),
 | 
							userIDs: make(map[string]*subClient),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Subscription struct {
 | 
					type subClient struct {
 | 
				
			||||||
	lock    sync.RWMutex
 | 
						clients map[string]*Client
 | 
				
			||||||
	userIDs map[string]*subClient
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *Subscription) GetClient(userID string) []*Client {
 | 
					type Subscription struct {
 | 
				
			||||||
	s.lock.RLock()
 | 
						lock    sync.RWMutex
 | 
				
			||||||
	defer s.lock.RUnlock()
 | 
						userIDs map[string]*subClient // subscribe to the user's client connection
 | 
				
			||||||
	cs, ok := s.userIDs[userID]
 | 
					 | 
				
			||||||
	if !ok {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	clients := make([]*Client, 0, len(cs.clients))
 | 
					 | 
				
			||||||
	for _, client := range cs.clients {
 | 
					 | 
				
			||||||
		clients = append(clients, client)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return clients
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *Subscription) DelClient(client *Client) {
 | 
					func (s *Subscription) DelClient(client *Client) {
 | 
				
			||||||
@ -99,6 +81,20 @@ func (s *Subscription) DelClient(client *Client) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *Subscription) GetClient(userID string) []*Client {
 | 
				
			||||||
 | 
						s.lock.RLock()
 | 
				
			||||||
 | 
						defer s.lock.RUnlock()
 | 
				
			||||||
 | 
						cs, ok := s.userIDs[userID]
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						clients := make([]*Client, 0, len(cs.clients))
 | 
				
			||||||
 | 
						for _, client := range cs.clients {
 | 
				
			||||||
 | 
							clients = append(clients, client)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return clients
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
					func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
				
			||||||
	if len(addUserIDs)+len(delUserIDs) == 0 {
 | 
						if len(addUserIDs)+len(delUserIDs) == 0 {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
@ -121,6 +117,7 @@ func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) {
 | 
				
			|||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		client.subUserIDs[userID] = struct{}{}
 | 
							client.subUserIDs[userID] = struct{}{}
 | 
				
			||||||
 | 
							add[userID] = struct{}{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	client.subLock.Unlock()
 | 
						client.subLock.Unlock()
 | 
				
			||||||
	if len(del)+len(add) == 0 {
 | 
						if len(del)+len(add) == 0 {
 | 
				
			||||||
@ -154,28 +151,16 @@ func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, p
 | 
				
			|||||||
	if len(clients) == 0 {
 | 
						if len(clients) == 0 {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	msgContent, err := json.Marshal(platformIDs)
 | 
						onlineStatus, err := proto.Marshal(&sdkws.SubUserOnlineStatusTips{
 | 
				
			||||||
 | 
							Subscribers: []*sdkws.SubUserOnlineStatusElem{{UserID: userID, OnlinePlatformIDs: platformIDs}},
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | 
							log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	now := time.Now().UnixMilli()
 | 
					 | 
				
			||||||
	msgID := idutil.GetMsgIDByMD5(userID)
 | 
					 | 
				
			||||||
	msg := &sdkws.MsgData{
 | 
					 | 
				
			||||||
		SendID:           userID,
 | 
					 | 
				
			||||||
		ClientMsgID:      msgID,
 | 
					 | 
				
			||||||
		ServerMsgID:      msgID,
 | 
					 | 
				
			||||||
		SenderPlatformID: constant.AdminPlatformID,
 | 
					 | 
				
			||||||
		SessionType:      constant.NotificationChatType,
 | 
					 | 
				
			||||||
		ContentType:      constant.UserSubscribeOnlineStatusNotification,
 | 
					 | 
				
			||||||
		Content:          msgContent,
 | 
					 | 
				
			||||||
		SendTime:         now,
 | 
					 | 
				
			||||||
		CreateTime:       now,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, client := range clients {
 | 
						for _, client := range clients {
 | 
				
			||||||
		msg.RecvID = client.UserID
 | 
							if err := client.PushUserOnlineStatus(onlineStatus); err != nil {
 | 
				
			||||||
		if err := client.PushMessage(ctx, msg); err != nil {
 | 
								log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "changePlatformID", platformIDs)
 | 
				
			||||||
			log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -358,9 +358,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
 | 
				
			|||||||
		prommetrics.OnlineUserGauge.Dec()
 | 
							prommetrics.OnlineUserGauge.Dec()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	ws.onlineUserConnNum.Add(-1)
 | 
						ws.onlineUserConnNum.Add(-1)
 | 
				
			||||||
	client.subLock.Lock()
 | 
						ws.subscription.DelClient(client)
 | 
				
			||||||
	clear(client.subUserIDs)
 | 
					 | 
				
			||||||
	client.subLock.Unlock()
 | 
					 | 
				
			||||||
	//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
 | 
						//ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
 | 
				
			||||||
	log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
 | 
						log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num",
 | 
				
			||||||
		ws.onlineUserNum.Load(), "online user conn Num",
 | 
							ws.onlineUserNum.Load(), "online user conn Num",
 | 
				
			||||||
 | 
				
			|||||||
@ -16,6 +16,8 @@ package friend
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -191,10 +193,37 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context
 | 
				
			|||||||
	f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
 | 
						f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) {
 | 
				
			||||||
 | 
						versions := versionctx.GetVersionLog(ctx).Get()
 | 
				
			||||||
 | 
						for _, coll := range versions {
 | 
				
			||||||
 | 
							if coll.Name == collName && coll.Doc.DID == id {
 | 
				
			||||||
 | 
								*version = uint64(coll.Doc.Version)
 | 
				
			||||||
 | 
								*versionID = coll.Doc.ID.Hex()
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FriendNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
 | 
				
			||||||
 | 
						versions := versionctx.GetVersionLog(ctx).Get()
 | 
				
			||||||
 | 
						for _, coll := range versions {
 | 
				
			||||||
 | 
							if coll.Name == collName && coll.Doc.DID == id {
 | 
				
			||||||
 | 
								*version = uint64(coll.Doc.Version)
 | 
				
			||||||
 | 
								*versionID = coll.Doc.ID.Hex()
 | 
				
			||||||
 | 
								for _, elem := range coll.Doc.Logs {
 | 
				
			||||||
 | 
									if elem.EID == relationtb.VersionSortChangeID {
 | 
				
			||||||
 | 
										*sortVersion = uint64(elem.Version)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
 | 
					func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) {
 | 
				
			||||||
	tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
 | 
						tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}}
 | 
				
			||||||
	tips.FromToUserID.FromUserID = fromUserID
 | 
						tips.FromToUserID.FromUserID = fromUserID
 | 
				
			||||||
	tips.FromToUserID.ToUserID = toUserID
 | 
						tips.FromToUserID.ToUserID = toUserID
 | 
				
			||||||
 | 
						f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, database.FriendVersionName, toUserID, &tips.FriendSortVersion)
 | 
				
			||||||
	f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
 | 
						f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -4,6 +4,7 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
 | 
				
			||||||
	"github.com/openimsdk/protocol/sdkws"
 | 
						"github.com/openimsdk/protocol/sdkws"
 | 
				
			||||||
 | 
						"slices"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
 | 
						"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 | 
				
			||||||
@ -52,24 +53,40 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
 | 
				
			|||||||
	if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
 | 
						if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						var sortVersion uint64
 | 
				
			||||||
	opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{
 | 
						opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{
 | 
				
			||||||
		Ctx:             ctx,
 | 
							Ctx:           ctx,
 | 
				
			||||||
		VersionKey:      req.UserID,
 | 
							VersionKey:    req.UserID,
 | 
				
			||||||
		VersionID:       req.VersionID,
 | 
							VersionID:     req.VersionID,
 | 
				
			||||||
		VersionNumber:   req.Version,
 | 
							VersionNumber: req.Version,
 | 
				
			||||||
		Version:         s.db.FindFriendIncrVersion,
 | 
							Version: func(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) {
 | 
				
			||||||
 | 
								vl, err := s.db.FindFriendIncrVersion(ctx, ownerUserID, version, limit)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
 | 
				
			||||||
 | 
									if elem.EID == model.VersionSortChangeID {
 | 
				
			||||||
 | 
										vl.LogLen--
 | 
				
			||||||
 | 
										sortVersion = uint64(elem.Version)
 | 
				
			||||||
 | 
										return true
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return false
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
								return vl, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		CacheMaxVersion: s.db.FindMaxFriendVersionCache,
 | 
							CacheMaxVersion: s.db.FindMaxFriendVersionCache,
 | 
				
			||||||
		Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
 | 
							Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
 | 
				
			||||||
			return s.getFriend(ctx, req.UserID, ids)
 | 
								return s.getFriend(ctx, req.UserID, ids)
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
 | 
							Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
 | 
				
			||||||
			return &relation.GetIncrementalFriendsResp{
 | 
								return &relation.GetIncrementalFriendsResp{
 | 
				
			||||||
				VersionID: version.ID.Hex(),
 | 
									VersionID:   version.ID.Hex(),
 | 
				
			||||||
				Version:   uint64(version.Version),
 | 
									Version:     uint64(version.Version),
 | 
				
			||||||
				Full:      full,
 | 
									Full:        full,
 | 
				
			||||||
				Delete:    deleteIds,
 | 
									Delete:      deleteIds,
 | 
				
			||||||
				Insert:    insertList,
 | 
									Insert:      insertList,
 | 
				
			||||||
				Update:    updateList,
 | 
									Update:      updateList,
 | 
				
			||||||
 | 
									SortVersion: sortVersion,
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
				
			|||||||
@ -306,6 +306,21 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) {
 | 
				
			||||||
 | 
						versions := versionctx.GetVersionLog(ctx).Get()
 | 
				
			||||||
 | 
						for _, coll := range versions {
 | 
				
			||||||
 | 
							if coll.Name == collName && coll.Doc.DID == id {
 | 
				
			||||||
 | 
								*version = uint64(coll.Doc.Version)
 | 
				
			||||||
 | 
								*versionID = coll.Doc.ID.Hex()
 | 
				
			||||||
 | 
								for _, elem := range coll.Doc.Logs {
 | 
				
			||||||
 | 
									if elem.EID == model.VersionSortChangeID {
 | 
				
			||||||
 | 
										*sortVersion = uint64(elem.Version)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
 | 
					func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) {
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
@ -707,7 +722,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con
 | 
				
			|||||||
	if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
						if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
 | 
						g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion)
 | 
				
			||||||
	g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
 | 
						g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -65,7 +65,10 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
 | 
				
			|||||||
	if group.Status == constant.GroupStatusDismissed {
 | 
						if group.Status == constant.GroupStatusDismissed {
 | 
				
			||||||
		return nil, servererrs.ErrDismissedAlready.Wrap()
 | 
							return nil, servererrs.ErrDismissedAlready.Wrap()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	var hasGroupUpdate bool
 | 
						var (
 | 
				
			||||||
 | 
							hasGroupUpdate bool
 | 
				
			||||||
 | 
							sortVersion    uint64
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
 | 
						opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
 | 
				
			||||||
		Ctx:           ctx,
 | 
							Ctx:           ctx,
 | 
				
			||||||
		VersionKey:    req.GroupID,
 | 
							VersionKey:    req.GroupID,
 | 
				
			||||||
@ -76,14 +79,20 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
 | 
				
			|||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				return nil, err
 | 
									return nil, err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool {
 | 
								logs := make([]model.VersionLogElem, 0, len(vl.Logs))
 | 
				
			||||||
				if elem.EID == "" {
 | 
								for i, log := range vl.Logs {
 | 
				
			||||||
 | 
									switch log.EID {
 | 
				
			||||||
 | 
									case model.VersionGroupChangeID:
 | 
				
			||||||
					vl.LogLen--
 | 
										vl.LogLen--
 | 
				
			||||||
					hasGroupUpdate = true
 | 
										hasGroupUpdate = true
 | 
				
			||||||
					return true
 | 
									case model.VersionSortChangeID:
 | 
				
			||||||
 | 
										vl.LogLen--
 | 
				
			||||||
 | 
										sortVersion = uint64(log.Version)
 | 
				
			||||||
 | 
									default:
 | 
				
			||||||
 | 
										logs = append(logs, vl.Logs[i])
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				return false
 | 
								}
 | 
				
			||||||
			})
 | 
								vl.Logs = logs
 | 
				
			||||||
			if vl.LogLen > 0 {
 | 
								if vl.LogLen > 0 {
 | 
				
			||||||
				hasGroupUpdate = true
 | 
									hasGroupUpdate = true
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@ -95,12 +104,13 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
 | 
							Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
 | 
				
			||||||
			return &pbgroup.GetIncrementalGroupMemberResp{
 | 
								return &pbgroup.GetIncrementalGroupMemberResp{
 | 
				
			||||||
				VersionID: version.ID.Hex(),
 | 
									VersionID:   version.ID.Hex(),
 | 
				
			||||||
				Version:   uint64(version.Version),
 | 
									Version:     uint64(version.Version),
 | 
				
			||||||
				Full:      full,
 | 
									Full:        full,
 | 
				
			||||||
				Delete:    delIDs,
 | 
									Delete:      delIDs,
 | 
				
			||||||
				Insert:    insertList,
 | 
									Insert:      insertList,
 | 
				
			||||||
				Update:    updateList,
 | 
									Update:      updateList,
 | 
				
			||||||
 | 
									SortVersion: sortVersion,
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
				
			|||||||
@ -3,7 +3,6 @@ package user
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"github.com/openimsdk/protocol/constant"
 | 
						"github.com/openimsdk/protocol/constant"
 | 
				
			||||||
	"github.com/openimsdk/protocol/sdkws"
 | 
					 | 
				
			||||||
	pbuser "github.com/openimsdk/protocol/user"
 | 
						pbuser "github.com/openimsdk/protocol/user"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -38,23 +37,6 @@ func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
 | 
					// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
 | 
				
			||||||
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
 | 
					func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
 | 
				
			||||||
	if req.Genre == constant.SubscriberUser {
 | 
					 | 
				
			||||||
		err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return nil, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		var status []*pbuser.OnlineStatus
 | 
					 | 
				
			||||||
		status, err = s.getUsersOnlineStatus(ctx, req.UserIDs)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return nil, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil
 | 
					 | 
				
			||||||
	} else if req.Genre == constant.Unsubscribe {
 | 
					 | 
				
			||||||
		err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return nil, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
 | 
						return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -82,34 +64,12 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu
 | 
				
			|||||||
	if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
 | 
						if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	list, err := s.db.GetSubscribedList(ctx, req.UserID)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, userID := range list {
 | 
					 | 
				
			||||||
		tips := &sdkws.UserStatusChangeTips{
 | 
					 | 
				
			||||||
			FromUserID: req.UserID,
 | 
					 | 
				
			||||||
			ToUserID:   userID,
 | 
					 | 
				
			||||||
			Status:     req.Status,
 | 
					 | 
				
			||||||
			PlatformID: req.PlatformID,
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		s.userNotificationSender.UserStatusChangeNotification(ctx, tips)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return &pbuser.SetUserStatusResp{}, nil
 | 
						return &pbuser.SetUserStatusResp{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetSubscribeUsersStatus Get the online status of subscribers.
 | 
					// GetSubscribeUsersStatus Get the online status of subscribers.
 | 
				
			||||||
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
 | 
					func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
 | 
				
			||||||
	userList, err := s.db.GetAllSubscribeList(ctx, req.UserID)
 | 
						return &pbuser.GetSubscribeUsersStatusResp{}, nil
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
 | 
					func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
 | 
				
			||||||
 | 
				
			|||||||
@ -93,8 +93,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
 | 
				
			|||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
 | 
						userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions())
 | 
				
			||||||
	userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB())
 | 
						database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx())
 | 
				
			||||||
	database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB)
 | 
					 | 
				
			||||||
	friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
 | 
						friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
 | 
				
			||||||
	groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
 | 
						groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
 | 
				
			||||||
	msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
 | 
						msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
 | 
				
			||||||
 | 
				
			|||||||
@ -63,7 +63,7 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s
 | 
				
			|||||||
	for i, key := range keys {
 | 
						for i, key := range keys {
 | 
				
			||||||
		result[i] = pipe.HGet(ctx, key, "CURR")
 | 
							result[i] = pipe.HGet(ctx, key, "CURR")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, err := pipe.Exec(ctx); err != nil {
 | 
						if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
 | 
				
			||||||
		return errs.Wrap(err)
 | 
							return errs.Wrap(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	var notFoundKey []string
 | 
						var notFoundKey []string
 | 
				
			||||||
 | 
				
			|||||||
@ -62,14 +62,6 @@ type UserDatabase interface {
 | 
				
			|||||||
	CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
 | 
						CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
 | 
						SortQuery(ctx context.Context, userIDName map[string]string, asc bool) ([]*model.User, error)
 | 
				
			||||||
	// SubscribeUsersStatus Subscribe a user's presence status
 | 
					 | 
				
			||||||
	SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
 | 
					 | 
				
			||||||
	// UnsubscribeUsersStatus unsubscribe a user's presence status
 | 
					 | 
				
			||||||
	UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error
 | 
					 | 
				
			||||||
	// GetAllSubscribeList Get a list of all subscriptions
 | 
					 | 
				
			||||||
	GetAllSubscribeList(ctx context.Context, userID string) ([]string, error)
 | 
					 | 
				
			||||||
	// GetSubscribedList Get all subscribed lists
 | 
					 | 
				
			||||||
	GetSubscribedList(ctx context.Context, userID string) ([]string, error)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// CRUD user command
 | 
						// CRUD user command
 | 
				
			||||||
	AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
 | 
						AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error
 | 
				
			||||||
@ -80,14 +72,13 @@ type UserDatabase interface {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type userDatabase struct {
 | 
					type userDatabase struct {
 | 
				
			||||||
	tx      tx.Tx
 | 
						tx     tx.Tx
 | 
				
			||||||
	userDB  database.User
 | 
						userDB database.User
 | 
				
			||||||
	cache   cache.UserCache
 | 
						cache  cache.UserCache
 | 
				
			||||||
	mongoDB database.SubscribeUser
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx, mongoDB database.SubscribeUser) UserDatabase {
 | 
					func NewUserDatabase(userDB database.User, cache cache.UserCache, tx tx.Tx) UserDatabase {
 | 
				
			||||||
	return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
 | 
						return &userDatabase{userDB: userDB, cache: cache, tx: tx}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error {
 | 
					func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error {
 | 
				
			||||||
@ -212,36 +203,6 @@ func (u *userDatabase) SortQuery(ctx context.Context, userIDName map[string]stri
 | 
				
			|||||||
	return u.userDB.SortQuery(ctx, userIDName, asc)
 | 
						return u.userDB.SortQuery(ctx, userIDName, asc)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// SubscribeUsersStatus Subscribe or unsubscribe a user's presence status.
 | 
					 | 
				
			||||||
func (u *userDatabase) SubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
 | 
					 | 
				
			||||||
	err := u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
 | 
					 | 
				
			||||||
	return err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// UnsubscribeUsersStatus unsubscribe a user's presence status.
 | 
					 | 
				
			||||||
func (u *userDatabase) UnsubscribeUsersStatus(ctx context.Context, userID string, userIDs []string) error {
 | 
					 | 
				
			||||||
	err := u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
 | 
					 | 
				
			||||||
	return err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetAllSubscribeList Get a list of all subscriptions.
 | 
					 | 
				
			||||||
func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) {
 | 
					 | 
				
			||||||
	list, err := u.mongoDB.GetAllSubscribeList(ctx, userID)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return list, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetSubscribedList Get all subscribed lists.
 | 
					 | 
				
			||||||
func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) {
 | 
					 | 
				
			||||||
	list, err := u.mongoDB.GetSubscribedList(ctx, userID)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return list, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
 | 
					func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error {
 | 
				
			||||||
	return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
 | 
						return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value, ex)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -109,7 +109,13 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU
 | 
				
			|||||||
	return mongoutil.IncrVersion(func() error {
 | 
						return mongoutil.IncrVersion(func() error {
 | 
				
			||||||
		return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
 | 
							return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true)
 | 
				
			||||||
	}, func() error {
 | 
						}, func() error {
 | 
				
			||||||
		return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, model.VersionStateUpdate)
 | 
							var friendUserIDs []string
 | 
				
			||||||
 | 
							if f.IsUpdateIsPinned(args) {
 | 
				
			||||||
 | 
								friendUserIDs = []string{model.VersionSortChangeID, friendUserID}
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								friendUserIDs = []string{friendUserID}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -214,7 +220,7 @@ func (f *FriendMgo) FindFriendUserIDs(ctx context.Context, ownerUserID string) (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) error {
 | 
					func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, val map[string]any) error {
 | 
				
			||||||
	// Ensure there are IDs to update
 | 
						// Ensure there are IDs to update
 | 
				
			||||||
	if len(friendUserIDs) == 0 {
 | 
						if len(friendUserIDs) == 0 || len(val) == 0 {
 | 
				
			||||||
		return nil // Or return an error if you expect there to always be IDs
 | 
							return nil // Or return an error if you expect there to always be IDs
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -230,7 +236,13 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien
 | 
				
			|||||||
	return mongoutil.IncrVersion(func() error {
 | 
						return mongoutil.IncrVersion(func() error {
 | 
				
			||||||
		return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
 | 
							return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update))
 | 
				
			||||||
	}, func() error {
 | 
						}, func() error {
 | 
				
			||||||
		return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate)
 | 
							var userIDs []string
 | 
				
			||||||
 | 
							if f.IsUpdateIsPinned(val) {
 | 
				
			||||||
 | 
								userIDs = append([]string{model.VersionSortChangeID}, friendUserIDs...)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								userIDs = friendUserIDs
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return f.owner.IncrVersion(ctx, ownerUserID, userIDs, model.VersionStateUpdate)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -248,3 +260,11 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) (
 | 
				
			|||||||
func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
 | 
					func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error {
 | 
				
			||||||
	return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state)
 | 
						return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (f *FriendMgo) IsUpdateIsPinned(data map[string]any) bool {
 | 
				
			||||||
 | 
						if data == nil {
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, ok := data["is_pinned"]
 | 
				
			||||||
 | 
						return ok
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -60,7 +60,7 @@ type GroupMemberMgo struct {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *GroupMemberMgo) memberSort() any {
 | 
					func (g *GroupMemberMgo) memberSort() any {
 | 
				
			||||||
	return bson.D{{"role_level", -1}, {"create_time", -1}}
 | 
						return bson.D{{"role_level", -1}, {"create_time", 1}}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {
 | 
					func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {
 | 
				
			||||||
@ -119,7 +119,7 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us
 | 
				
			|||||||
		return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
 | 
							return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID},
 | 
				
			||||||
			bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
 | 
								bson.M{"$set": bson.M{"role_level": roleLevel}}, true)
 | 
				
			||||||
	}, func() error {
 | 
						}, func() error {
 | 
				
			||||||
		return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
 | 
							return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, userID}, model.VersionStateUpdate)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
 | 
					func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error {
 | 
				
			||||||
@ -132,10 +132,9 @@ func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID strin
 | 
				
			|||||||
			bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
 | 
								bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}, func() error {
 | 
						}, func() error {
 | 
				
			||||||
		return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate)
 | 
							return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, firstUserID, secondUserID}, model.VersionStateUpdate)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -146,7 +145,13 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri
 | 
				
			|||||||
	return mongoutil.IncrVersion(func() error {
 | 
						return mongoutil.IncrVersion(func() error {
 | 
				
			||||||
		return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
 | 
							return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true)
 | 
				
			||||||
	}, func() error {
 | 
						}, func() error {
 | 
				
			||||||
		return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate)
 | 
							var userIDs []string
 | 
				
			||||||
 | 
							if g.IsUpdateRoleLevel(data) {
 | 
				
			||||||
 | 
								userIDs = []string{model.VersionSortChangeID, userID}
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								userIDs = []string{userID}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateUpdate)
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1,189 +0,0 @@
 | 
				
			|||||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
					 | 
				
			||||||
// you may not use this file except in compliance with the License.
 | 
					 | 
				
			||||||
// You may obtain a copy of the License at
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// Unless required by applicable law or agreed to in writing, software
 | 
					 | 
				
			||||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
					 | 
				
			||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					 | 
				
			||||||
// See the License for the specific language governing permissions and
 | 
					 | 
				
			||||||
// limitations under the License.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
package mgo
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"context"
 | 
					 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
					 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
					 | 
				
			||||||
	"go.mongodb.org/mongo-driver/bson"
 | 
					 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo"
 | 
					 | 
				
			||||||
	"go.mongodb.org/mongo-driver/mongo/options"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// prefixes and suffixes.
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	SubscriptionPrefix = "subscription_prefix"
 | 
					 | 
				
			||||||
	SubscribedPrefix   = "subscribed_prefix"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// MaximumSubscription Maximum number of subscriptions.
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	MaximumSubscription = 3000
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func NewUserMongoDriver(database *mongo.Database) database.SubscribeUser {
 | 
					 | 
				
			||||||
	return &UserMongoDriver{
 | 
					 | 
				
			||||||
		userCollection: database.Collection(model.SubscribeUserTableName),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type UserMongoDriver struct {
 | 
					 | 
				
			||||||
	userCollection *mongo.Collection
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// AddSubscriptionList Subscriber's handling of thresholds.
 | 
					 | 
				
			||||||
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
 | 
					 | 
				
			||||||
	// Check the number of lists in the key.
 | 
					 | 
				
			||||||
	pipeline := mongo.Pipeline{
 | 
					 | 
				
			||||||
		{{"$match", bson.D{{"user_id", SubscriptionPrefix + userID}}}},
 | 
					 | 
				
			||||||
		{{"$project", bson.D{{"count", bson.D{{"$size", "$user_id_list"}}}}}},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// perform aggregate operations
 | 
					 | 
				
			||||||
	cursor, err := u.userCollection.Aggregate(ctx, pipeline)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return errs.Wrap(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer cursor.Close(ctx)
 | 
					 | 
				
			||||||
	var cnt struct {
 | 
					 | 
				
			||||||
		Count int `bson:"count"`
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// iterate over aggregated results
 | 
					 | 
				
			||||||
	for cursor.Next(ctx) {
 | 
					 | 
				
			||||||
		err = cursor.Decode(&cnt)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return errs.Wrap(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	var newUserIDList []string
 | 
					 | 
				
			||||||
	// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
 | 
					 | 
				
			||||||
	if cnt.Count+len(userIDList) > MaximumSubscription {
 | 
					 | 
				
			||||||
		newUserIDList, err = u.GetAllSubscribeList(ctx, userID)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
 | 
					 | 
				
			||||||
		_, err = u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
			ctx,
 | 
					 | 
				
			||||||
			bson.M{"user_id": SubscriptionPrefix + userID},
 | 
					 | 
				
			||||||
			bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// Another way to subscribe to N before pop,Delete after testing
 | 
					 | 
				
			||||||
		/*for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
 | 
					 | 
				
			||||||
			_, err := u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
				ctx,
 | 
					 | 
				
			||||||
				bson.M{"user_id": SubscriptionPrefix + userID},
 | 
					 | 
				
			||||||
				bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				return err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}*/
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	upsert := true
 | 
					 | 
				
			||||||
	opts := &options.UpdateOptions{
 | 
					 | 
				
			||||||
		Upsert: &upsert,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	_, err = u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
		ctx,
 | 
					 | 
				
			||||||
		bson.M{"user_id": SubscriptionPrefix + userID},
 | 
					 | 
				
			||||||
		bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
 | 
					 | 
				
			||||||
		opts,
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return errs.Wrap(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, user := range userIDList {
 | 
					 | 
				
			||||||
		_, err = u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
			ctx,
 | 
					 | 
				
			||||||
			bson.M{"user_id": SubscribedPrefix + user},
 | 
					 | 
				
			||||||
			bson.M{"$addToSet": bson.M{"user_id_list": userID}},
 | 
					 | 
				
			||||||
			opts,
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return errs.WrapMsg(err, "transaction failed")
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// UnsubscriptionList Handling of unsubscribe.
 | 
					 | 
				
			||||||
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
 | 
					 | 
				
			||||||
	_, err := u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
		ctx,
 | 
					 | 
				
			||||||
		bson.M{"user_id": SubscriptionPrefix + userID},
 | 
					 | 
				
			||||||
		bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return errs.Wrap(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return errs.Wrap(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
 | 
					 | 
				
			||||||
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
 | 
					 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
	for _, userIDTemp := range userIDList {
 | 
					 | 
				
			||||||
		_, err = u.userCollection.UpdateOne(
 | 
					 | 
				
			||||||
			ctx,
 | 
					 | 
				
			||||||
			bson.M{"user_id": SubscribedPrefix + userIDTemp},
 | 
					 | 
				
			||||||
			bson.M{"$pull": bson.M{"user_id_list": userID}},
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return errs.Wrap(err)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetAllSubscribeList Get all users subscribed by this user.
 | 
					 | 
				
			||||||
func (u *UserMongoDriver) GetAllSubscribeList(ctx context.Context, userID string) (userIDList []string, err error) {
 | 
					 | 
				
			||||||
	var user model.SubscribeUser
 | 
					 | 
				
			||||||
	cursor := u.userCollection.FindOne(
 | 
					 | 
				
			||||||
		ctx,
 | 
					 | 
				
			||||||
		bson.M{"user_id": SubscriptionPrefix + userID})
 | 
					 | 
				
			||||||
	err = cursor.Decode(&user)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if err == mongo.ErrNoDocuments {
 | 
					 | 
				
			||||||
			return []string{}, nil
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			return nil, errs.Wrap(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return user.UserIDList, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetSubscribedList Get the user subscribed by those users.
 | 
					 | 
				
			||||||
func (u *UserMongoDriver) GetSubscribedList(ctx context.Context, userID string) (userIDList []string, err error) {
 | 
					 | 
				
			||||||
	var user model.SubscribeUser
 | 
					 | 
				
			||||||
	cursor := u.userCollection.FindOne(
 | 
					 | 
				
			||||||
		ctx,
 | 
					 | 
				
			||||||
		bson.M{"user_id": SubscribedPrefix + userID})
 | 
					 | 
				
			||||||
	err = cursor.Decode(&user)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if err == mongo.ErrNoDocuments {
 | 
					 | 
				
			||||||
			return []string{}, nil
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			return nil, errs.Wrap(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return user.UserIDList, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -19,8 +19,8 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
 | 
					func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
 | 
				
			||||||
	lm := &VersionLogMgo{coll: coll}
 | 
						lm := &VersionLogMgo{coll: coll}
 | 
				
			||||||
	if lm.initIndex(context.Background()) != nil {
 | 
						if err := lm.initIndex(context.Background()); err != nil {
 | 
				
			||||||
		return nil, errs.ErrInternalServer.WrapMsg("init index failed", "coll", coll.Name())
 | 
							return nil, errs.WrapMsg(err, "init version log index failed", "coll", coll.Name())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return lm, nil
 | 
						return lm, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -155,8 +155,24 @@ func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []s
 | 
				
			|||||||
			"$unset": "delete_e_ids",
 | 
								"$unset": "delete_e_ids",
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0})
 | 
						projection := bson.M{
 | 
				
			||||||
	return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
 | 
							"logs": 0,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(projection)
 | 
				
			||||||
 | 
						res, err := mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						res.Logs = make([]model.VersionLogElem, 0, len(eIds))
 | 
				
			||||||
 | 
						for _, id := range eIds {
 | 
				
			||||||
 | 
							res.Logs = append(res.Logs, model.VersionLogElem{
 | 
				
			||||||
 | 
								EID:        id,
 | 
				
			||||||
 | 
								State:      state,
 | 
				
			||||||
 | 
								Version:    res.Version,
 | 
				
			||||||
 | 
								LastUpdate: res.LastUpdate,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return res, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
 | 
					func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) {
 | 
				
			||||||
 | 
				
			|||||||
@ -9,12 +9,12 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Result[V any](val V, err error) V {
 | 
					//func Result[V any](val V, err error) V {
 | 
				
			||||||
	if err != nil {
 | 
					//	if err != nil {
 | 
				
			||||||
		panic(err)
 | 
					//		panic(err)
 | 
				
			||||||
	}
 | 
					//	}
 | 
				
			||||||
	return val
 | 
					//	return val
 | 
				
			||||||
}
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Check(err error) {
 | 
					func Check(err error) {
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@ -30,7 +30,7 @@ func TestName(t *testing.T) {
 | 
				
			|||||||
		panic(err)
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	vl := tmp.(*VersionLogMgo)
 | 
						vl := tmp.(*VersionLogMgo)
 | 
				
			||||||
	res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now())
 | 
						res, err := vl.incrVersionResult(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Log(err)
 | 
							t.Log(err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
 | 
				
			|||||||
@ -1,31 +0,0 @@
 | 
				
			|||||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
					 | 
				
			||||||
// you may not use this file except in compliance with the License.
 | 
					 | 
				
			||||||
// You may obtain a copy of the License at
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// Unless required by applicable law or agreed to in writing, software
 | 
					 | 
				
			||||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
					 | 
				
			||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					 | 
				
			||||||
// See the License for the specific language governing permissions and
 | 
					 | 
				
			||||||
// limitations under the License.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
package database
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import "context"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// SubscribeUser Operation interface of user mongodb.
 | 
					 | 
				
			||||||
type SubscribeUser interface {
 | 
					 | 
				
			||||||
	// AddSubscriptionList Subscriber's handling of thresholds.
 | 
					 | 
				
			||||||
	AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
 | 
					 | 
				
			||||||
	// UnsubscriptionList Handling of unsubscribe.
 | 
					 | 
				
			||||||
	UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
 | 
					 | 
				
			||||||
	// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
 | 
					 | 
				
			||||||
	RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
 | 
					 | 
				
			||||||
	// GetAllSubscribeList Get all users subscribed by this user
 | 
					 | 
				
			||||||
	GetAllSubscribeList(ctx context.Context, id string) (userIDList []string, err error)
 | 
					 | 
				
			||||||
	// GetSubscribedList Get the user subscribed by those users
 | 
					 | 
				
			||||||
	GetSubscribedList(ctx context.Context, id string) (userIDList []string, err error)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -14,6 +14,11 @@ const (
 | 
				
			|||||||
	VersionStateUpdate
 | 
						VersionStateUpdate
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						VersionGroupChangeID = ""
 | 
				
			||||||
 | 
						VersionSortChangeID  = "____S_O_R_T_I_D____"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type VersionLogElem struct {
 | 
					type VersionLogElem struct {
 | 
				
			||||||
	EID        string    `bson:"e_id"`
 | 
						EID        string    `bson:"e_id"`
 | 
				
			||||||
	State      int32     `bson:"state"`
 | 
						State      int32     `bson:"state"`
 | 
				
			||||||
 | 
				
			|||||||
@ -116,7 +116,7 @@ func Main(conf string, del time.Duration) error {
 | 
				
			|||||||
		{
 | 
							{
 | 
				
			||||||
			Prefix: MaxSeq,
 | 
								Prefix: MaxSeq,
 | 
				
			||||||
			GetSeq: cSeq.GetMaxSeq,
 | 
								GetSeq: cSeq.GetMaxSeq,
 | 
				
			||||||
			SetSeq: cSeq.SetMinSeq,
 | 
								SetSeq: cSeq.SetMaxSeq,
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Prefix: MinSeq,
 | 
								Prefix: MinSeq,
 | 
				
			||||||
 | 
				
			|||||||
@ -12,7 +12,7 @@ func main() {
 | 
				
			|||||||
		config string
 | 
							config string
 | 
				
			||||||
		second int
 | 
							second int
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	flag.StringVar(&config, "c", "/Users/chao/Desktop/project/open-im-server/config", "config directory")
 | 
						flag.StringVar(&config, "c", "", "config directory")
 | 
				
			||||||
	flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
 | 
						flag.IntVar(&second, "sec", 3600*24, "delayed deletion of the original seq key after conversion")
 | 
				
			||||||
	flag.Parse()
 | 
						flag.Parse()
 | 
				
			||||||
	if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
 | 
						if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user