mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 03:13:15 +08:00 
			
		
		
		
	Merge branch 'openimsdk:main' into refactor/parse-token
This commit is contained in:
		
						commit
						cac45e484e
					
				@ -16,6 +16,7 @@ package friend
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/mq/memamq"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
				
			||||||
@ -49,6 +50,7 @@ type friendServer struct {
 | 
				
			|||||||
	RegisterCenter        discovery.SvcDiscoveryRegistry
 | 
						RegisterCenter        discovery.SvcDiscoveryRegistry
 | 
				
			||||||
	config                *Config
 | 
						config                *Config
 | 
				
			||||||
	webhookClient         *webhook.Client
 | 
						webhookClient         *webhook.Client
 | 
				
			||||||
 | 
						queue                 *memamq.MemoryQueue
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Config struct {
 | 
					type Config struct {
 | 
				
			||||||
@ -118,8 +120,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
 | 
				
			|||||||
		conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
 | 
							conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
 | 
				
			||||||
		config:                config,
 | 
							config:                config,
 | 
				
			||||||
		webhookClient:         webhook.NewWebhookClient(config.WebhooksConfig.URL),
 | 
							webhookClient:         webhook.NewWebhookClient(config.WebhooksConfig.URL),
 | 
				
			||||||
 | 
							queue:                 memamq.NewMemoryQueue(128, 1024*8),
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -4,6 +4,7 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
 | 
				
			||||||
	"github.com/openimsdk/protocol/sdkws"
 | 
						"github.com/openimsdk/protocol/sdkws"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/log"
 | 
				
			||||||
	"slices"
 | 
						"slices"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
 | 
						"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
 | 
				
			||||||
@ -17,14 +18,23 @@ func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *rela
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, userID := range userIDs {
 | 
						if len(userIDs) > 0 {
 | 
				
			||||||
		if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil {
 | 
							friendUserIDs := []string{req.UserID}
 | 
				
			||||||
			return nil, err
 | 
							noCancelCtx := context.WithoutCancel(ctx)
 | 
				
			||||||
 | 
							err := s.queue.PushCtx(ctx, func() {
 | 
				
			||||||
 | 
								for _, userID := range userIDs {
 | 
				
			||||||
 | 
									if err := s.db.OwnerIncrVersion(noCancelCtx, userID, friendUserIDs, model.VersionStateUpdate); err != nil {
 | 
				
			||||||
 | 
										log.ZError(ctx, "OwnerIncrVersion", err, "userID", userID, "friendUserIDs", friendUserIDs)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								for _, userID := range userIDs {
 | 
				
			||||||
 | 
									s.notificationSender.FriendInfoUpdatedNotification(noCancelCtx, req.UserID, userID)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.ZError(ctx, "NotificationUserInfoUpdate timeout", err, "userID", req.UserID)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, userID := range userIDs {
 | 
					 | 
				
			||||||
		s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &relation.NotificationUserInfoUpdateResp{}, nil
 | 
						return &relation.NotificationUserInfoUpdateResp{}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -17,6 +17,7 @@ package group
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
 | 
				
			||||||
@ -126,25 +127,8 @@ func (g *GroupNotificationSender) getGroupInfo(ctx context.Context, groupID stri
 | 
				
			|||||||
	if len(ownerUserIDs) > 0 {
 | 
						if len(ownerUserIDs) > 0 {
 | 
				
			||||||
		ownerUserID = ownerUserIDs[0]
 | 
							ownerUserID = ownerUserIDs[0]
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &sdkws.GroupInfo{
 | 
					
 | 
				
			||||||
		GroupID:                gm.GroupID,
 | 
						return convert.Db2PbGroupInfo(gm, ownerUserID, num), nil
 | 
				
			||||||
		GroupName:              gm.GroupName,
 | 
					 | 
				
			||||||
		Notification:           gm.Notification,
 | 
					 | 
				
			||||||
		Introduction:           gm.Introduction,
 | 
					 | 
				
			||||||
		FaceURL:                gm.FaceURL,
 | 
					 | 
				
			||||||
		OwnerUserID:            ownerUserID,
 | 
					 | 
				
			||||||
		CreateTime:             gm.CreateTime.UnixMilli(),
 | 
					 | 
				
			||||||
		MemberCount:            num,
 | 
					 | 
				
			||||||
		Ex:                     gm.Ex,
 | 
					 | 
				
			||||||
		Status:                 gm.Status,
 | 
					 | 
				
			||||||
		CreatorUserID:          gm.CreatorUserID,
 | 
					 | 
				
			||||||
		GroupType:              gm.GroupType,
 | 
					 | 
				
			||||||
		NeedVerification:       gm.NeedVerification,
 | 
					 | 
				
			||||||
		LookMemberInfo:         gm.LookMemberInfo,
 | 
					 | 
				
			||||||
		ApplyMemberFriend:      gm.ApplyMemberFriend,
 | 
					 | 
				
			||||||
		NotificationUpdateTime: gm.NotificationUpdateTime.UnixMilli(),
 | 
					 | 
				
			||||||
		NotificationUserID:     gm.NotificationUserID,
 | 
					 | 
				
			||||||
	}, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
 | 
					func (g *GroupNotificationSender) getGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
 | 
				
			||||||
@ -198,29 +182,6 @@ func (g *GroupNotificationSender) getGroupOwnerAndAdminUserID(ctx context.Contex
 | 
				
			|||||||
	return datautil.Slice(members, fn), nil
 | 
						return datautil.Slice(members, fn), nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//nolint:unused
 | 
					 | 
				
			||||||
func (g *GroupNotificationSender) groupDB2PB(group *model.Group, ownerUserID string, memberCount uint32) *sdkws.GroupInfo {
 | 
					 | 
				
			||||||
	return &sdkws.GroupInfo{
 | 
					 | 
				
			||||||
		GroupID:                group.GroupID,
 | 
					 | 
				
			||||||
		GroupName:              group.GroupName,
 | 
					 | 
				
			||||||
		Notification:           group.Notification,
 | 
					 | 
				
			||||||
		Introduction:           group.Introduction,
 | 
					 | 
				
			||||||
		FaceURL:                group.FaceURL,
 | 
					 | 
				
			||||||
		OwnerUserID:            ownerUserID,
 | 
					 | 
				
			||||||
		CreateTime:             group.CreateTime.UnixMilli(),
 | 
					 | 
				
			||||||
		MemberCount:            memberCount,
 | 
					 | 
				
			||||||
		Ex:                     group.Ex,
 | 
					 | 
				
			||||||
		Status:                 group.Status,
 | 
					 | 
				
			||||||
		CreatorUserID:          group.CreatorUserID,
 | 
					 | 
				
			||||||
		GroupType:              group.GroupType,
 | 
					 | 
				
			||||||
		NeedVerification:       group.NeedVerification,
 | 
					 | 
				
			||||||
		LookMemberInfo:         group.LookMemberInfo,
 | 
					 | 
				
			||||||
		ApplyMemberFriend:      group.ApplyMemberFriend,
 | 
					 | 
				
			||||||
		NotificationUpdateTime: group.NotificationUpdateTime.UnixMilli(),
 | 
					 | 
				
			||||||
		NotificationUserID:     group.NotificationUserID,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo {
 | 
					func (g *GroupNotificationSender) groupMemberDB2PB(member *model.GroupMember, appMangerLevel int32) *sdkws.GroupMemberFullInfo {
 | 
				
			||||||
	return &sdkws.GroupMemberFullInfo{
 | 
						return &sdkws.GroupMemberFullInfo{
 | 
				
			||||||
		GroupID:        member.GroupID,
 | 
							GroupID:        member.GroupID,
 | 
				
			||||||
 | 
				
			|||||||
@ -111,7 +111,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) {
 | 
					func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) {
 | 
				
			||||||
	var chatLogs []*sdkws.MsgData
 | 
						var chatLogs []*sdkws.MsgData
 | 
				
			||||||
	var total int32
 | 
						var total int64
 | 
				
			||||||
	resp = &msg.SearchMessageResp{}
 | 
						resp = &msg.SearchMessageResp{}
 | 
				
			||||||
	if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil {
 | 
						if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
@ -194,7 +194,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		resp.ChatLogs = append(resp.ChatLogs, pbchatLog)
 | 
							resp.ChatLogs = append(resp.ChatLogs, pbchatLog)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	resp.ChatLogsNum = total
 | 
						resp.ChatLogsNum = int32(total)
 | 
				
			||||||
	return resp, nil
 | 
						return resp, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										2
									
								
								pkg/common/storage/cache/redis/batch_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								pkg/common/storage/cache/redis/batch_test.go
									
									
									
									
										vendored
									
									
								
							@ -45,7 +45,7 @@ func TestName(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser)
 | 
						seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	res, err := seqUser.GetReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
 | 
						res, err := seqUser.GetUserReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		panic(err)
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										32
									
								
								pkg/common/storage/cache/redis/seq_user_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										32
									
								
								pkg/common/storage/cache/redis/seq_user_test.go
									
									
									
									
										vendored
									
									
								
							@ -4,7 +4,10 @@ import (
 | 
				
			|||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
				
			||||||
 | 
						mgo2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						"github.com/redis/go-redis/v9"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo/options"
 | 
				
			||||||
	"log"
 | 
						"log"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync/atomic"
 | 
						"sync/atomic"
 | 
				
			||||||
@ -77,3 +80,32 @@ func TestRecvOnline(t *testing.T) {
 | 
				
			|||||||
		fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
 | 
							fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestName1(t *testing.T) {
 | 
				
			||||||
 | 
						opt := &redis.Options{
 | 
				
			||||||
 | 
							Addr:     "172.16.8.48:16379",
 | 
				
			||||||
 | 
							Password: "openIM123",
 | 
				
			||||||
 | 
							DB:       0,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						rdb := redis.NewClient(opt)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mgo, err := mongo.Connect(context.Background(),
 | 
				
			||||||
 | 
							options.Client().
 | 
				
			||||||
 | 
								ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
 | 
				
			||||||
 | 
								SetConnectTimeout(5*time.Second))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						model, err := mgo2.NewSeqUserMongo(mgo.Database("openim_v3"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						seq := NewSeqUserCacheRedis(rdb, model)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						res, err := seq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.Log(res)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -84,7 +84,7 @@ type CommonMsgDatabase interface {
 | 
				
			|||||||
	//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
 | 
						//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
 | 
				
			||||||
	SetSendMsgStatus(ctx context.Context, id string, status int32) error
 | 
						SetSendMsgStatus(ctx context.Context, id string, status int32) error
 | 
				
			||||||
	GetSendMsgStatus(ctx context.Context, id string) (int32, error)
 | 
						GetSendMsgStatus(ctx context.Context, id string) (int32, error)
 | 
				
			||||||
	SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
 | 
						SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error)
 | 
				
			||||||
	FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
 | 
						FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// to mq
 | 
						// to mq
 | 
				
			||||||
@ -878,7 +878,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
 | 
				
			|||||||
	return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
 | 
						return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) {
 | 
					func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error) {
 | 
				
			||||||
	var totalMsgs []*sdkws.MsgData
 | 
						var totalMsgs []*sdkws.MsgData
 | 
				
			||||||
	total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
 | 
						total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 | 
				
			|||||||
@ -278,124 +278,409 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
 | 
					//func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) {
 | 
				
			||||||
	where := make(bson.A, 0, 6)
 | 
					//
 | 
				
			||||||
 | 
					//	return nil, nil
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) {
 | 
				
			||||||
 | 
					//	var pipeline bson.A
 | 
				
			||||||
 | 
					//	if !nextID.IsZero() {
 | 
				
			||||||
 | 
					//		pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	pipeline = append(pipeline,
 | 
				
			||||||
 | 
					//		bson.M{"$match": filter},
 | 
				
			||||||
 | 
					//		bson.M{"$limit": limit},
 | 
				
			||||||
 | 
					//		bson.M{"$unwind": "$msgs"},
 | 
				
			||||||
 | 
					//		bson.M{"$match": filter},
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$group": bson.M{
 | 
				
			||||||
 | 
					//				"_id": "$_id",
 | 
				
			||||||
 | 
					//				"doc_id": bson.M{
 | 
				
			||||||
 | 
					//					"$first": "$doc_id",
 | 
				
			||||||
 | 
					//				},
 | 
				
			||||||
 | 
					//				"msgs": bson.M{"$push": "$msgs"},
 | 
				
			||||||
 | 
					//			},
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//	)
 | 
				
			||||||
 | 
					//	if !content {
 | 
				
			||||||
 | 
					//		pipeline = append(pipeline,
 | 
				
			||||||
 | 
					//			bson.M{
 | 
				
			||||||
 | 
					//				"$project": bson.M{
 | 
				
			||||||
 | 
					//					"_id":   1,
 | 
				
			||||||
 | 
					//					"count": bson.M{"$size": "$msgs"},
 | 
				
			||||||
 | 
					//				},
 | 
				
			||||||
 | 
					//			},
 | 
				
			||||||
 | 
					//		)
 | 
				
			||||||
 | 
					//		type result struct {
 | 
				
			||||||
 | 
					//			ID    primitive.ObjectID `bson:"_id"`
 | 
				
			||||||
 | 
					//			Count int64              `bson:"count"`
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
					//		if err != nil {
 | 
				
			||||||
 | 
					//			return 0, nil, primitive.ObjectID{}, err
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		if len(res) == 0 {
 | 
				
			||||||
 | 
					//			return 0, nil, primitive.ObjectID{}, nil
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		var count int64
 | 
				
			||||||
 | 
					//		for _, r := range res {
 | 
				
			||||||
 | 
					//			count += r.Count
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		return count, nil, res[len(res)-1].ID, nil
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	type result struct {
 | 
				
			||||||
 | 
					//		ID  primitive.ObjectID    `bson:"_id"`
 | 
				
			||||||
 | 
					//		Msg []*model.MsgInfoModel `bson:"msgs"`
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
					//	if err != nil {
 | 
				
			||||||
 | 
					//		return 0, nil, primitive.ObjectID{}, err
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if len(res) == 0 {
 | 
				
			||||||
 | 
					//		return 0, nil, primitive.ObjectID{}, err
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	var count int
 | 
				
			||||||
 | 
					//	for _, r := range res {
 | 
				
			||||||
 | 
					//		count += len(r.Msg)
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	msgs := make([]*model.MsgInfoModel, 0, count)
 | 
				
			||||||
 | 
					//	for _, r := range res {
 | 
				
			||||||
 | 
					//		msgs = append(msgs, r.Msg...)
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	return int64(count), msgs, res[len(res)-1].ID, nil
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					db.msg3.aggregate(
 | 
				
			||||||
 | 
					    [
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            "$match": {
 | 
				
			||||||
 | 
					                "doc_id": "si_7009965934_8710838466:0"
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type searchMessageIndex struct {
 | 
				
			||||||
 | 
						ID    primitive.ObjectID `bson:"_id"`
 | 
				
			||||||
 | 
						Index []int64            `bson:"index"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID primitive.ObjectID, limit int) ([]searchMessageIndex, error) {
 | 
				
			||||||
 | 
						var pipeline bson.A
 | 
				
			||||||
 | 
						if !nextID.IsZero() {
 | 
				
			||||||
 | 
							pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pipeline = append(pipeline,
 | 
				
			||||||
 | 
							bson.M{"$sort": bson.M{"_id": 1}},
 | 
				
			||||||
 | 
							bson.M{"$match": filter},
 | 
				
			||||||
 | 
							bson.M{"$limit": limit},
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$project": bson.M{
 | 
				
			||||||
 | 
									"_id": 1,
 | 
				
			||||||
 | 
									"msgs": bson.M{
 | 
				
			||||||
 | 
										"$map": bson.M{
 | 
				
			||||||
 | 
											"input": "$msgs",
 | 
				
			||||||
 | 
											"as":    "msg",
 | 
				
			||||||
 | 
											"in": bson.M{
 | 
				
			||||||
 | 
												"$mergeObjects": bson.A{
 | 
				
			||||||
 | 
													"$$msg",
 | 
				
			||||||
 | 
													bson.M{
 | 
				
			||||||
 | 
														"_search_temp_index": bson.M{
 | 
				
			||||||
 | 
															"$indexOfArray": bson.A{
 | 
				
			||||||
 | 
																"$msgs", "$$msg",
 | 
				
			||||||
 | 
															},
 | 
				
			||||||
 | 
														},
 | 
				
			||||||
 | 
													},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							bson.M{"$unwind": "$msgs"},
 | 
				
			||||||
 | 
							bson.M{"$match": filter},
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$project": bson.M{
 | 
				
			||||||
 | 
									"_id":                     1,
 | 
				
			||||||
 | 
									"msgs._search_temp_index": 1,
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$group": bson.M{
 | 
				
			||||||
 | 
									"_id":   "$_id",
 | 
				
			||||||
 | 
									"index": bson.M{"$push": "$msgs._search_temp_index"},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							bson.M{"$sort": bson.M{"_id": 1}},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						return mongoutil.Aggregate[searchMessageIndex](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) {
 | 
				
			||||||
 | 
						filter := bson.M{}
 | 
				
			||||||
	if req.RecvID != "" {
 | 
						if req.RecvID != "" {
 | 
				
			||||||
		where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
 | 
							filter["$or"] = bson.A{
 | 
				
			||||||
 | 
								bson.M{"msgs.msg.recv_id": req.RecvID},
 | 
				
			||||||
 | 
								bson.M{"msgs.msg.group_id": req.RecvID},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if req.SendID != "" {
 | 
						if req.SendID != "" {
 | 
				
			||||||
		where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
 | 
							filter["msgs.msg.send_id"] = req.SendID
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if req.ContentType != 0 {
 | 
						if req.ContentType != 0 {
 | 
				
			||||||
		where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
 | 
							filter["msgs.msg.content_type"] = req.ContentType
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if req.SessionType != 0 {
 | 
						if req.SessionType != 0 {
 | 
				
			||||||
		where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
 | 
							filter["msgs.msg.session_type"] = req.SessionType
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if req.SendTime != "" {
 | 
						if req.SendTime != "" {
 | 
				
			||||||
		sendTime, err := time.Parse(time.DateOnly, req.SendTime)
 | 
							sendTime, err := time.Parse(time.DateOnly, req.SendTime)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
 | 
								return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		where = append(where,
 | 
							filter["$and"] = bson.A{
 | 
				
			||||||
			bson.M{
 | 
								bson.M{"msgs.msg.send_time": bson.M{
 | 
				
			||||||
				"msgs.msg.send_time": bson.M{
 | 
									"$gte": sendTime.UnixMilli(),
 | 
				
			||||||
					"$gte": sendTime.UnixMilli(),
 | 
								}},
 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			bson.M{
 | 
								bson.M{
 | 
				
			||||||
				"msgs.msg.send_time": bson.M{
 | 
									"msgs.msg.send_time": bson.M{
 | 
				
			||||||
					"$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
 | 
										"$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
 | 
				
			||||||
				},
 | 
									},
 | 
				
			||||||
			},
 | 
								},
 | 
				
			||||||
		)
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var (
 | 
				
			||||||
 | 
							nextID    primitive.ObjectID
 | 
				
			||||||
 | 
							count     int
 | 
				
			||||||
 | 
							dataRange []searchMessageIndex
 | 
				
			||||||
 | 
							skip      = int((req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber())
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						_, _ = dataRange, skip
 | 
				
			||||||
 | 
						const maxDoc = 50
 | 
				
			||||||
 | 
						data := make([]searchMessageIndex, 0, req.Pagination.GetShowNumber())
 | 
				
			||||||
 | 
						push := cap(data)
 | 
				
			||||||
 | 
						for i := 0; ; i++ {
 | 
				
			||||||
 | 
							res, err := m.searchMessageIndex(ctx, filter, nextID, maxDoc)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return 0, nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if len(res) > 0 {
 | 
				
			||||||
 | 
								nextID = res[len(res)-1].ID
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for _, r := range res {
 | 
				
			||||||
 | 
								var dataIndex []int64
 | 
				
			||||||
 | 
								for _, index := range r.Index {
 | 
				
			||||||
 | 
									if push > 0 && count >= skip {
 | 
				
			||||||
 | 
										dataIndex = append(dataIndex, index)
 | 
				
			||||||
 | 
										push--
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									count++
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if len(dataIndex) > 0 {
 | 
				
			||||||
 | 
									data = append(data, searchMessageIndex{
 | 
				
			||||||
 | 
										ID:    r.ID,
 | 
				
			||||||
 | 
										Index: dataIndex,
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if push <= 0 {
 | 
				
			||||||
 | 
								push--
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if len(res) < maxDoc || push < -10 {
 | 
				
			||||||
 | 
								return int64(count), data, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) {
 | 
				
			||||||
 | 
						if len(index) == 0 {
 | 
				
			||||||
 | 
							return nil, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pipeline := bson.A{
 | 
						pipeline := bson.A{
 | 
				
			||||||
		bson.M{
 | 
							bson.M{"$match": bson.M{"_id": id}},
 | 
				
			||||||
			"$unwind": "$msgs",
 | 
							bson.M{"$project": "$msgs"},
 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if len(where) > 0 {
 | 
					 | 
				
			||||||
		pipeline = append(pipeline, bson.M{
 | 
					 | 
				
			||||||
			"$match": bson.M{"$and": where},
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	pipeline = append(pipeline,
 | 
					 | 
				
			||||||
		bson.M{
 | 
					 | 
				
			||||||
			"$project": bson.M{
 | 
					 | 
				
			||||||
				"_id": 0,
 | 
					 | 
				
			||||||
				"msg": "$msgs.msg",
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		bson.M{
 | 
					 | 
				
			||||||
			"$count": "count",
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return 0, nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(count) == 0 || count[0] == 0 {
 | 
					 | 
				
			||||||
		return 0, nil, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	pipeline = pipeline[:len(pipeline)-1]
 | 
					 | 
				
			||||||
	pipeline = append(pipeline,
 | 
					 | 
				
			||||||
		bson.M{
 | 
					 | 
				
			||||||
			"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		bson.M{
 | 
					 | 
				
			||||||
			"$limit": req.Pagination.GetShowNumber(),
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
	msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
 | 
						msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return msgs, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) {
 | 
				
			||||||
 | 
						count, data, err := m.searchMessage(ctx, req)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return 0, nil, err
 | 
							return 0, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for i := range msgs {
 | 
						var msgs []*model.MsgInfoModel
 | 
				
			||||||
		msgInfo := msgs[i]
 | 
						if len(data) > 0 {
 | 
				
			||||||
		if msgInfo == nil || msgInfo.Msg == nil {
 | 
							var n int
 | 
				
			||||||
			continue
 | 
							for _, d := range data {
 | 
				
			||||||
 | 
								n += len(d.Index)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if msgInfo.Revoke != nil {
 | 
							msgs = make([]*model.MsgInfoModel, 0, n)
 | 
				
			||||||
			revokeContent := sdkws.MessageRevokedContent{
 | 
						}
 | 
				
			||||||
				RevokerID:                   msgInfo.Revoke.UserID,
 | 
						for _, val := range data {
 | 
				
			||||||
				RevokerRole:                 msgInfo.Revoke.Role,
 | 
							res, err := mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"_id": val.ID})
 | 
				
			||||||
				ClientMsgID:                 msgInfo.Msg.ClientMsgID,
 | 
							if err != nil {
 | 
				
			||||||
				RevokerNickname:             msgInfo.Revoke.Nickname,
 | 
								return 0, nil, err
 | 
				
			||||||
				RevokeTime:                  msgInfo.Revoke.Time,
 | 
							}
 | 
				
			||||||
				SourceMessageSendTime:       msgInfo.Msg.SendTime,
 | 
							for _, i := range val.Index {
 | 
				
			||||||
				SourceMessageSendID:         msgInfo.Msg.SendID,
 | 
								if i >= int64(len(res.Msg)) {
 | 
				
			||||||
				SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
 | 
									continue
 | 
				
			||||||
				SessionType:                 msgInfo.Msg.SessionType,
 | 
					 | 
				
			||||||
				Seq:                         msgInfo.Msg.Seq,
 | 
					 | 
				
			||||||
				Ex:                          msgInfo.Msg.Ex,
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			data, err := jsonutil.JsonMarshal(&revokeContent)
 | 
								msgs = append(msgs, res.Msg[i])
 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			elem := sdkws.NotificationElem{Detail: string(data)}
 | 
					 | 
				
			||||||
			content, err := jsonutil.JsonMarshal(&elem)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			msgInfo.Msg.ContentType = constant.MsgRevokeNotification
 | 
					 | 
				
			||||||
			msgInfo.Msg.Content = string(content)
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
 | 
						return count, msgs, nil
 | 
				
			||||||
	//n := int32(len(msgs))
 | 
					 | 
				
			||||||
	//if start >= n {
 | 
					 | 
				
			||||||
	//	return n, []*relation.MsgInfoModel{}, nil
 | 
					 | 
				
			||||||
	//}
 | 
					 | 
				
			||||||
	//if start+req.Pagination.ShowNumber < n {
 | 
					 | 
				
			||||||
	//	msgs = msgs[start : start+req.Pagination.ShowNumber]
 | 
					 | 
				
			||||||
	//} else {
 | 
					 | 
				
			||||||
	//	msgs = msgs[start:]
 | 
					 | 
				
			||||||
	//}
 | 
					 | 
				
			||||||
	return count[0], msgs, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
 | 
				
			||||||
 | 
					//	where := make(bson.A, 0, 6)
 | 
				
			||||||
 | 
					//	if req.RecvID != "" {
 | 
				
			||||||
 | 
					//		if req.SessionType == constant.ReadGroupChatType {
 | 
				
			||||||
 | 
					//			where = append(where, bson.M{
 | 
				
			||||||
 | 
					//				"$or": bson.A{
 | 
				
			||||||
 | 
					//					bson.M{"doc_id": "^n_" + req.RecvID + ":"},
 | 
				
			||||||
 | 
					//					bson.M{"doc_id": "^sg_" + req.RecvID + ":"},
 | 
				
			||||||
 | 
					//				},
 | 
				
			||||||
 | 
					//			})
 | 
				
			||||||
 | 
					//		} else {
 | 
				
			||||||
 | 
					//			where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if req.SendID != "" {
 | 
				
			||||||
 | 
					//		where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if req.ContentType != 0 {
 | 
				
			||||||
 | 
					//		where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if req.SessionType != 0 {
 | 
				
			||||||
 | 
					//		where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if req.SendTime != "" {
 | 
				
			||||||
 | 
					//		sendTime, err := time.Parse(time.DateOnly, req.SendTime)
 | 
				
			||||||
 | 
					//		if err != nil {
 | 
				
			||||||
 | 
					//			return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		where = append(where,
 | 
				
			||||||
 | 
					//			bson.M{
 | 
				
			||||||
 | 
					//				"msgs.msg.send_time": bson.M{
 | 
				
			||||||
 | 
					//					"$gte": sendTime.UnixMilli(),
 | 
				
			||||||
 | 
					//				},
 | 
				
			||||||
 | 
					//			},
 | 
				
			||||||
 | 
					//			bson.M{
 | 
				
			||||||
 | 
					//				"msgs.msg.send_time": bson.M{
 | 
				
			||||||
 | 
					//					"$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
 | 
				
			||||||
 | 
					//				},
 | 
				
			||||||
 | 
					//			},
 | 
				
			||||||
 | 
					//		)
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	opt := options.Find().SetLimit(100)
 | 
				
			||||||
 | 
					//	res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt)
 | 
				
			||||||
 | 
					//	if err != nil {
 | 
				
			||||||
 | 
					//		return 0, nil, err
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	_ = res
 | 
				
			||||||
 | 
					//	fmt.Println()
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//	return 0, nil, nil
 | 
				
			||||||
 | 
					//	pipeline := bson.A{
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$unwind": "$msgs",
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	if len(where) > 0 {
 | 
				
			||||||
 | 
					//		pipeline = append(pipeline, bson.M{
 | 
				
			||||||
 | 
					//			"$match": bson.M{"$and": where},
 | 
				
			||||||
 | 
					//		})
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	pipeline = append(pipeline,
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$project": bson.M{
 | 
				
			||||||
 | 
					//				"_id": 0,
 | 
				
			||||||
 | 
					//				"msg": "$msgs.msg",
 | 
				
			||||||
 | 
					//			},
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$count": "count",
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//	)
 | 
				
			||||||
 | 
					//	//count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
					//	//if err != nil {
 | 
				
			||||||
 | 
					//	//	return 0, nil, err
 | 
				
			||||||
 | 
					//	//}
 | 
				
			||||||
 | 
					//	//if len(count) == 0 || count[0] == 0 {
 | 
				
			||||||
 | 
					//	//	return 0, nil, nil
 | 
				
			||||||
 | 
					//	//}
 | 
				
			||||||
 | 
					//	count := []int32{0}
 | 
				
			||||||
 | 
					//	pipeline = pipeline[:len(pipeline)-1]
 | 
				
			||||||
 | 
					//	pipeline = append(pipeline,
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//		bson.M{
 | 
				
			||||||
 | 
					//			"$limit": req.Pagination.GetShowNumber(),
 | 
				
			||||||
 | 
					//		},
 | 
				
			||||||
 | 
					//	)
 | 
				
			||||||
 | 
					//	msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
					//	if err != nil {
 | 
				
			||||||
 | 
					//		return 0, nil, err
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	for i := range msgs {
 | 
				
			||||||
 | 
					//		msgInfo := msgs[i]
 | 
				
			||||||
 | 
					//		if msgInfo == nil || msgInfo.Msg == nil {
 | 
				
			||||||
 | 
					//			continue
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//		if msgInfo.Revoke != nil {
 | 
				
			||||||
 | 
					//			revokeContent := sdkws.MessageRevokedContent{
 | 
				
			||||||
 | 
					//				RevokerID:                   msgInfo.Revoke.UserID,
 | 
				
			||||||
 | 
					//				RevokerRole:                 msgInfo.Revoke.Role,
 | 
				
			||||||
 | 
					//				ClientMsgID:                 msgInfo.Msg.ClientMsgID,
 | 
				
			||||||
 | 
					//				RevokerNickname:             msgInfo.Revoke.Nickname,
 | 
				
			||||||
 | 
					//				RevokeTime:                  msgInfo.Revoke.Time,
 | 
				
			||||||
 | 
					//				SourceMessageSendTime:       msgInfo.Msg.SendTime,
 | 
				
			||||||
 | 
					//				SourceMessageSendID:         msgInfo.Msg.SendID,
 | 
				
			||||||
 | 
					//				SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
 | 
				
			||||||
 | 
					//				SessionType:                 msgInfo.Msg.SessionType,
 | 
				
			||||||
 | 
					//				Seq:                         msgInfo.Msg.Seq,
 | 
				
			||||||
 | 
					//				Ex:                          msgInfo.Msg.Ex,
 | 
				
			||||||
 | 
					//			}
 | 
				
			||||||
 | 
					//			data, err := jsonutil.JsonMarshal(&revokeContent)
 | 
				
			||||||
 | 
					//			if err != nil {
 | 
				
			||||||
 | 
					//				return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
 | 
				
			||||||
 | 
					//			}
 | 
				
			||||||
 | 
					//			elem := sdkws.NotificationElem{Detail: string(data)}
 | 
				
			||||||
 | 
					//			content, err := jsonutil.JsonMarshal(&elem)
 | 
				
			||||||
 | 
					//			if err != nil {
 | 
				
			||||||
 | 
					//				return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
 | 
				
			||||||
 | 
					//			}
 | 
				
			||||||
 | 
					//			msgInfo.Msg.ContentType = constant.MsgRevokeNotification
 | 
				
			||||||
 | 
					//			msgInfo.Msg.Content = string(content)
 | 
				
			||||||
 | 
					//		}
 | 
				
			||||||
 | 
					//	}
 | 
				
			||||||
 | 
					//	//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
 | 
				
			||||||
 | 
					//	//n := int32(len(msgs))
 | 
				
			||||||
 | 
					//	//if start >= n {
 | 
				
			||||||
 | 
					//	//	return n, []*relation.MsgInfoModel{}, nil
 | 
				
			||||||
 | 
					//	//}
 | 
				
			||||||
 | 
					//	//if start+req.Pagination.ShowNumber < n {
 | 
				
			||||||
 | 
					//	//	msgs = msgs[start : start+req.Pagination.ShowNumber]
 | 
				
			||||||
 | 
					//	//} else {
 | 
				
			||||||
 | 
					//	//	msgs = msgs[start:]
 | 
				
			||||||
 | 
					//	//}
 | 
				
			||||||
 | 
					//	return count[0], msgs, nil
 | 
				
			||||||
 | 
					//}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
 | 
					func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
 | 
				
			||||||
	var sort int
 | 
						var sort int
 | 
				
			||||||
	if ase {
 | 
						if ase {
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										75
									
								
								pkg/common/storage/database/mgo/msg_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								pkg/common/storage/database/mgo/msg_test.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,75 @@
 | 
				
			|||||||
 | 
					package mgo
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
				
			||||||
 | 
						"github.com/openimsdk/protocol/msg"
 | 
				
			||||||
 | 
						"github.com/openimsdk/protocol/sdkws"
 | 
				
			||||||
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/bson"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo"
 | 
				
			||||||
 | 
						"go.mongodb.org/mongo-driver/mongo/options"
 | 
				
			||||||
 | 
						"math/rand"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestName1(t *testing.T) {
 | 
				
			||||||
 | 
						ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						v := &MsgMgo{
 | 
				
			||||||
 | 
							coll: cli.Database("openim_v3").Collection("msg3"),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						req := &msg.SearchMessageReq{
 | 
				
			||||||
 | 
							//RecvID: "3187706596",
 | 
				
			||||||
 | 
							//SendID:      "7009965934",
 | 
				
			||||||
 | 
							ContentType: 101,
 | 
				
			||||||
 | 
							//SendTime:    "2024-05-06",
 | 
				
			||||||
 | 
							//SessionType: 3,
 | 
				
			||||||
 | 
							Pagination: &sdkws.RequestPagination{
 | 
				
			||||||
 | 
								PageNumber: 1,
 | 
				
			||||||
 | 
								ShowNumber: 10,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						total, res, err := v.SearchMessage(ctx, req)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i, re := range res {
 | 
				
			||||||
 | 
							t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Log(total)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestName10(t *testing.T) {
 | 
				
			||||||
 | 
						ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						v := &MsgMgo{
 | 
				
			||||||
 | 
							coll: cli.Database("openim_v3").Collection("msg3"),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						opt := options.Find().SetLimit(1000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						res, err := mongoutil.Find[model.MsgDocModel](ctx, v.coll, bson.M{}, opt)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ctx = context.Background()
 | 
				
			||||||
 | 
						for i := 0; i < 100000; i++ {
 | 
				
			||||||
 | 
							for j := range res {
 | 
				
			||||||
 | 
								res[j].DocID = strconv.FormatUint(rand.Uint64(), 10) + ":0"
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if err := mongoutil.InsertMany(ctx, v.coll, res); err != nil {
 | 
				
			||||||
 | 
								panic(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							t.Log("====>", time.Now(), i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -26,7 +26,7 @@ func Mongodb() *mongo.Database {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func TestUserSeq(t *testing.T) {
 | 
					func TestUserSeq(t *testing.T) {
 | 
				
			||||||
	uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
 | 
						uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
 | 
				
			||||||
	t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4))
 | 
						t.Log(uSeq.SetUserMinSeq(context.Background(), "1000", "2000", 4))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestConversationSeq(t *testing.T) {
 | 
					func TestConversationSeq(t *testing.T) {
 | 
				
			||||||
@ -35,3 +35,8 @@ func TestConversationSeq(t *testing.T) {
 | 
				
			|||||||
	t.Log(cSeq.Malloc(context.Background(), "2000", 10))
 | 
						t.Log(cSeq.Malloc(context.Background(), "2000", 10))
 | 
				
			||||||
	t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
 | 
						t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestUserGetUserReadSeqs(t *testing.T) {
 | 
				
			||||||
 | 
						uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
 | 
				
			||||||
 | 
						t.Log(uSeq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"}))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -88,6 +88,14 @@ func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string
 | 
				
			|||||||
	return s.getSeq(ctx, conversationID, userID, "read_seq")
 | 
						return s.getSeq(ctx, conversationID, userID, "read_seq")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *seqUserMongo) notFoundSet0(seq map[string]int64, conversationIDs []string) {
 | 
				
			||||||
 | 
						for _, conversationID := range conversationIDs {
 | 
				
			||||||
 | 
							if _, ok := seq[conversationID]; !ok {
 | 
				
			||||||
 | 
								seq[conversationID] = 0
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
 | 
					func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
 | 
				
			||||||
	if len(conversationID) == 0 {
 | 
						if len(conversationID) == 0 {
 | 
				
			||||||
		return map[string]int64{}, nil
 | 
							return map[string]int64{}, nil
 | 
				
			||||||
@ -102,6 +110,7 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
 | 
				
			|||||||
	for _, seq := range seqs {
 | 
						for _, seq := range seqs {
 | 
				
			||||||
		res[seq.ConversationID] = seq.ReadSeq
 | 
							res[seq.ConversationID] = seq.ReadSeq
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						s.notFoundSet0(res, conversationID)
 | 
				
			||||||
	return res, nil
 | 
						return res, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -37,7 +37,7 @@ type Msg interface {
 | 
				
			|||||||
	GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error)
 | 
						GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error)
 | 
				
			||||||
	DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
 | 
						DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
 | 
				
			||||||
	MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
 | 
						MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
 | 
				
			||||||
	SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error)
 | 
						SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error)
 | 
				
			||||||
	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
 | 
						RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
 | 
				
			||||||
	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
 | 
						RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
 | 
				
			||||||
	ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
 | 
						ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user