mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	Merge remote-tracking branch 'origin/main-transfer-update' into main-transfer-update
This commit is contained in:
		
						commit
						5796b4c8a5
					
				
							
								
								
									
										3
									
								
								.github/workflows/build-docker-image.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.github/workflows/build-docker-image.yml
									
									
									
									
										vendored
									
									
								
							@ -15,8 +15,6 @@
 | 
				
			|||||||
name: Publish Docker image
 | 
					name: Publish Docker image
 | 
				
			||||||
 | 
					
 | 
				
			||||||
on:
 | 
					on:
 | 
				
			||||||
  schedule:
 | 
					 | 
				
			||||||
  - cron: '30 2 * * *'
 | 
					 | 
				
			||||||
  push:
 | 
					  push:
 | 
				
			||||||
    branches:
 | 
					    branches:
 | 
				
			||||||
      - main
 | 
					      - main
 | 
				
			||||||
@ -31,6 +29,7 @@ env:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
jobs:
 | 
					jobs:
 | 
				
			||||||
  build-dockerhub:
 | 
					  build-dockerhub:
 | 
				
			||||||
 | 
					    if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.action == 'closed' && github.event.pull_request.merged == true)
 | 
				
			||||||
    runs-on: ubuntu-latest
 | 
					    runs-on: ubuntu-latest
 | 
				
			||||||
    steps:
 | 
					    steps:
 | 
				
			||||||
      - name: Checkout
 | 
					      - name: Checkout
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								.github/workflows/stale.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/workflows/stale.yml
									
									
									
									
										vendored
									
									
								
							@ -21,7 +21,7 @@ name: Mark stale issues and pull requests
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
on:
 | 
					on:
 | 
				
			||||||
  schedule:
 | 
					  schedule:
 | 
				
			||||||
  - cron: '0 8 * * *'
 | 
					  - cron: '0 8 * * 1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
jobs:
 | 
					jobs:
 | 
				
			||||||
  stale:
 | 
					  stale:
 | 
				
			||||||
@ -36,7 +36,7 @@ jobs:
 | 
				
			|||||||
      with:
 | 
					      with:
 | 
				
			||||||
        repo-token: ${{ secrets.BOT_GITHUB_TOKEN }}
 | 
					        repo-token: ${{ secrets.BOT_GITHUB_TOKEN }}
 | 
				
			||||||
        days-before-stale: 60
 | 
					        days-before-stale: 60
 | 
				
			||||||
        days-before-close: 7
 | 
					        days-before-close: 305
 | 
				
			||||||
        stale-issue-message: 'This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.'
 | 
					        stale-issue-message: 'This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.'
 | 
				
			||||||
        stale-pr-message: 'This issue is stale because it has been open 60 days with no activity.'
 | 
					        stale-pr-message: 'This issue is stale because it has been open 60 days with no activity.'
 | 
				
			||||||
        close-issue-message: 'This issue was closed because it has been stalled for 7 days with no activity.'
 | 
					        close-issue-message: 'This issue was closed because it has been stalled for 7 days with no activity.'
 | 
				
			||||||
 | 
				
			|||||||
@ -43,7 +43,7 @@ COPY --from=builder $SERVER_DIR/start-config.yml $SERVER_DIR/
 | 
				
			|||||||
COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/
 | 
					COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/
 | 
				
			||||||
COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/
 | 
					COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
RUN go get github.com/openimsdk/gomake@v0.0.9
 | 
					RUN go get github.com/openimsdk/gomake@v0.0.13
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Set the command to run when the container starts
 | 
					# Set the command to run when the container starts
 | 
				
			||||||
ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"]
 | 
					ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"]
 | 
				
			||||||
 | 
				
			|||||||
@ -51,6 +51,7 @@ services:
 | 
				
			|||||||
    ports:
 | 
					    ports:
 | 
				
			||||||
      - "12181:2181"
 | 
					      - "12181:2181"
 | 
				
			||||||
    environment:
 | 
					    environment:
 | 
				
			||||||
 | 
					      #JVMFLAGS: "-Xms32m -Xmx128m"
 | 
				
			||||||
      TZ: "Asia/Shanghai"
 | 
					      TZ: "Asia/Shanghai"
 | 
				
			||||||
      ALLOW_ANONYMOUS_LOGIN: "yes"
 | 
					      ALLOW_ANONYMOUS_LOGIN: "yes"
 | 
				
			||||||
    restart: always
 | 
					    restart: always
 | 
				
			||||||
@ -70,6 +71,7 @@ services:
 | 
				
			|||||||
    command: >
 | 
					    command: >
 | 
				
			||||||
      bash -c "/opt/bitnami/scripts/kafka/run.sh & /opt/bitnami/kafka/create-topic.sh; wait"
 | 
					      bash -c "/opt/bitnami/scripts/kafka/run.sh & /opt/bitnami/kafka/create-topic.sh; wait"
 | 
				
			||||||
    environment:
 | 
					    environment:
 | 
				
			||||||
 | 
					      #KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
 | 
				
			||||||
      TZ: Asia/Shanghai
 | 
					      TZ: Asia/Shanghai
 | 
				
			||||||
      KAFKA_CFG_NODE_ID: 0
 | 
					      KAFKA_CFG_NODE_ID: 0
 | 
				
			||||||
      KAFKA_CFG_PROCESS_ROLES: controller,broker
 | 
					      KAFKA_CFG_PROCESS_ROLES: controller,broker
 | 
				
			||||||
@ -119,3 +121,4 @@ services:
 | 
				
			|||||||
      - openim
 | 
					      - openim
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
								
							@ -34,7 +34,7 @@ require (
 | 
				
			|||||||
	github.com/hashicorp/golang-lru/v2 v2.0.7
 | 
						github.com/hashicorp/golang-lru/v2 v2.0.7
 | 
				
			||||||
	github.com/kelindar/bitmap v1.5.2
 | 
						github.com/kelindar/bitmap v1.5.2
 | 
				
			||||||
	github.com/likexian/gokit v0.25.13
 | 
						github.com/likexian/gokit v0.25.13
 | 
				
			||||||
	github.com/openimsdk/gomake v0.0.11
 | 
						github.com/openimsdk/gomake v0.0.13
 | 
				
			||||||
	github.com/redis/go-redis/v9 v9.4.0
 | 
						github.com/redis/go-redis/v9 v9.4.0
 | 
				
			||||||
	github.com/robfig/cron/v3 v3.0.1
 | 
						github.com/robfig/cron/v3 v3.0.1
 | 
				
			||||||
	github.com/shirou/gopsutil v3.21.11+incompatible
 | 
						github.com/shirou/gopsutil v3.21.11+incompatible
 | 
				
			||||||
@ -44,6 +44,7 @@ require (
 | 
				
			|||||||
	golang.org/x/sync v0.6.0
 | 
						golang.org/x/sync v0.6.0
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	cloud.google.com/go v0.112.0 // indirect
 | 
						cloud.google.com/go v0.112.0 // indirect
 | 
				
			||||||
	cloud.google.com/go/compute v1.23.3 // indirect
 | 
						cloud.google.com/go/compute v1.23.3 // indirect
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							@ -266,8 +266,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
 | 
				
			|||||||
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
					github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
				
			||||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
 | 
					github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
 | 
				
			||||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
 | 
					github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
 | 
				
			||||||
github.com/openimsdk/gomake v0.0.11 h1:jJ9286zKFfBeARkmfqMEcUYg9lJ+Cj9lylxP8W9uCFM=
 | 
					github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
 | 
				
			||||||
github.com/openimsdk/gomake v0.0.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
					github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
					github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
				
			||||||
github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
					github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
 | 
				
			||||||
github.com/openimsdk/tools v0.0.49-alpha.5 h1:kJacyByLDjObO5w5EMPoo/JjD9D2fhSAQ5qeK2XgawI=
 | 
					github.com/openimsdk/tools v0.0.49-alpha.5 h1:kJacyByLDjObO5w5EMPoo/JjD9D2fhSAQ5qeK2XgawI=
 | 
				
			||||||
 | 
				
			|||||||
@ -25,8 +25,6 @@ import (
 | 
				
			|||||||
	"google.golang.org/grpc"
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
	"google.golang.org/grpc/credentials/insecure"
 | 
						"google.golang.org/grpc/credentials/insecure"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"os/signal"
 | 
					 | 
				
			||||||
	"syscall"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/openimsdk/tools/errs"
 | 
						"github.com/openimsdk/tools/errs"
 | 
				
			||||||
@ -50,22 +48,12 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
				
			|||||||
		return errs.WrapMsg(err, "failed to register discovery service")
 | 
							return errs.WrapMsg(err, "failed to register discovery service")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
						client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
				
			||||||
	ctx, exitBy := context.WithCancelCause(context.Background())
 | 
					 | 
				
			||||||
	ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
 | 
						ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
 | 
				
			||||||
	conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
 | 
						conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	cli := msg.NewMsgClient(conn)
 | 
						cli := msg.NewMsgClient(conn)
 | 
				
			||||||
	go func() {
 | 
					 | 
				
			||||||
		sigs := make(chan os.Signal, 1)
 | 
					 | 
				
			||||||
		signal.Notify(sigs, syscall.SIGTERM)
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ctx.Done():
 | 
					 | 
				
			||||||
		case s := <-sigs:
 | 
					 | 
				
			||||||
			exitBy(fmt.Errorf("exit signal %s", s))
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	crontab := cron.New()
 | 
						crontab := cron.New()
 | 
				
			||||||
	clearFunc := func() {
 | 
						clearFunc := func() {
 | 
				
			||||||
		now := time.Now()
 | 
							now := time.Now()
 | 
				
			||||||
@ -84,5 +72,5 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
 | 
				
			|||||||
	log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
 | 
						log.ZInfo(ctx, "start cron task", "chatRecordsClearTime", config.CronTask.ChatRecordsClearTime)
 | 
				
			||||||
	crontab.Start()
 | 
						crontab.Start()
 | 
				
			||||||
	<-ctx.Done()
 | 
						<-ctx.Done()
 | 
				
			||||||
	return context.Cause(ctx)
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										12
									
								
								magefile.go
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								magefile.go
									
									
									
									
									
								
							@ -6,22 +6,12 @@ package main
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"github.com/openimsdk/gomake/mageutil"
 | 
						"github.com/openimsdk/gomake/mageutil"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var Default = Build
 | 
					var Default = Build
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Build() {
 | 
					func Build() {
 | 
				
			||||||
	platforms := os.Getenv("PLATFORMS")
 | 
						mageutil.Build()
 | 
				
			||||||
	if platforms == "" {
 | 
					 | 
				
			||||||
		platforms = mageutil.DetectPlatform()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, platform := range strings.Split(platforms, " ") {
 | 
					 | 
				
			||||||
		mageutil.CompileForPlatform(platform)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mageutil.PrintGreen("All binaries under cmd and tools were successfully compiled.")
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Start() {
 | 
					func Start() {
 | 
				
			||||||
 | 
				
			|||||||
@ -15,14 +15,13 @@
 | 
				
			|||||||
package config
 | 
					package config
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"github.com/openimsdk/tools/db/mongoutil"
 | 
						"github.com/openimsdk/tools/db/mongoutil"
 | 
				
			||||||
	"github.com/openimsdk/tools/db/redisutil"
 | 
						"github.com/openimsdk/tools/db/redisutil"
 | 
				
			||||||
	"github.com/openimsdk/tools/mq/kafka"
 | 
						"github.com/openimsdk/tools/mq/kafka"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3/cos"
 | 
						"github.com/openimsdk/tools/s3/cos"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3/minio"
 | 
						"github.com/openimsdk/tools/s3/minio"
 | 
				
			||||||
	"github.com/openimsdk/tools/s3/oss"
 | 
						"github.com/openimsdk/tools/s3/oss"
 | 
				
			||||||
	"net"
 | 
						"strings"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -473,25 +472,23 @@ func (k *Kafka) Build() *kafka.Config {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *Minio) Build() *minio.Config {
 | 
					func (m *Minio) Build() *minio.Config {
 | 
				
			||||||
	conf := minio.Config{
 | 
						formatEndpoint := func(address string) string {
 | 
				
			||||||
 | 
							if strings.HasPrefix(address, "http://") || strings.HasPrefix(address, "https://") {
 | 
				
			||||||
 | 
								return address
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return "http://" + address
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &minio.Config{
 | 
				
			||||||
		Bucket:          m.Bucket,
 | 
							Bucket:          m.Bucket,
 | 
				
			||||||
		AccessKeyID:     m.AccessKeyID,
 | 
							AccessKeyID:     m.AccessKeyID,
 | 
				
			||||||
		SecretAccessKey: m.SecretAccessKey,
 | 
							SecretAccessKey: m.SecretAccessKey,
 | 
				
			||||||
		SessionToken:    m.SessionToken,
 | 
							SessionToken:    m.SessionToken,
 | 
				
			||||||
		PublicRead:      m.PublicRead,
 | 
							PublicRead:      m.PublicRead,
 | 
				
			||||||
 | 
							Endpoint:        formatEndpoint(m.InternalAddress),
 | 
				
			||||||
 | 
							SignEndpoint:    formatEndpoint(m.ExternalAddress),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if _, _, err := net.SplitHostPort(m.InternalAddress); err == nil {
 | 
					 | 
				
			||||||
		conf.Endpoint = fmt.Sprintf("http://%s", m.InternalAddress)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		conf.Endpoint = m.InternalAddress
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if _, _, err := net.SplitHostPort(m.ExternalAddress); err == nil {
 | 
					 | 
				
			||||||
		conf.SignEndpoint = fmt.Sprintf("http://%s", m.ExternalAddress)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		conf.SignEndpoint = m.ExternalAddress
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &conf
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (c *Cos) Build() *cos.Config {
 | 
					func (c *Cos) Build() *cos.Config {
 | 
				
			||||||
	return &cos.Config{
 | 
						return &cos.Config{
 | 
				
			||||||
 | 
				
			|||||||
@ -267,58 +267,80 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*relation.MsgInfoModel, error) {
 | 
					func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*relation.MsgInfoModel, error) {
 | 
				
			||||||
	var pipe mongo.Pipeline
 | 
						where := make(bson.A, 0, 6)
 | 
				
			||||||
	condition := bson.A{}
 | 
					 | 
				
			||||||
	if req.SendTime != "" {
 | 
					 | 
				
			||||||
		// Changed to keyed fields for bson.M to avoid govet errors
 | 
					 | 
				
			||||||
		condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if req.ContentType != 0 {
 | 
					 | 
				
			||||||
		condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if req.SessionType != 0 {
 | 
					 | 
				
			||||||
		condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if req.RecvID != "" {
 | 
						if req.RecvID != "" {
 | 
				
			||||||
		condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}})
 | 
							where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if req.SendID != "" {
 | 
						if req.SendID != "" {
 | 
				
			||||||
		condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}})
 | 
							where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if req.ContentType != 0 {
 | 
				
			||||||
	or := bson.A{
 | 
							where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
 | 
				
			||||||
		bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}},
 | 
					 | 
				
			||||||
		bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}},
 | 
					 | 
				
			||||||
		bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}},
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if req.SessionType != 0 {
 | 
				
			||||||
	// Use bson.D with keyed fields to specify the order explicitly
 | 
							where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
 | 
				
			||||||
	pipe = mongo.Pipeline{
 | 
					 | 
				
			||||||
		{{"$match", bson.D{{Key: "$or", Value: or}}}},
 | 
					 | 
				
			||||||
		{{"$project", bson.D{
 | 
					 | 
				
			||||||
			{Key: "msgs", Value: bson.D{
 | 
					 | 
				
			||||||
				{Key: "$filter", Value: bson.D{
 | 
					 | 
				
			||||||
					{Key: "input", Value: "$msgs"},
 | 
					 | 
				
			||||||
					{Key: "as", Value: "item"},
 | 
					 | 
				
			||||||
					{Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}},
 | 
					 | 
				
			||||||
				}},
 | 
					 | 
				
			||||||
			}},
 | 
					 | 
				
			||||||
			{Key: "doc_id", Value: 1},
 | 
					 | 
				
			||||||
		}}},
 | 
					 | 
				
			||||||
		{{"$unwind", bson.M{"path": "$msgs"}}},
 | 
					 | 
				
			||||||
		{{"$sort", bson.M{"msgs.msg.send_time": -1}}},
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	type docModel struct {
 | 
						if req.SendTime != "" {
 | 
				
			||||||
		DocID string                 `bson:"doc_id"`
 | 
							sendTime, err := time.Parse(time.DateOnly, req.SendTime)
 | 
				
			||||||
		Msg   *relation.MsgInfoModel `bson:"msgs"`
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							where = append(where,
 | 
				
			||||||
 | 
								bson.M{
 | 
				
			||||||
 | 
									"msgs.msg.send_time": bson.M{
 | 
				
			||||||
 | 
										"$gte": sendTime.UnixMilli(),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								bson.M{
 | 
				
			||||||
 | 
									"msgs.msg.send_time": bson.M{
 | 
				
			||||||
 | 
										"$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	msgsDocs, err := mongoutil.Aggregate[*docModel](ctx, m.coll, pipe)
 | 
						pipeline := bson.A{
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$unwind": "$msgs",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(where) > 0 {
 | 
				
			||||||
 | 
							pipeline = append(pipeline, bson.M{
 | 
				
			||||||
 | 
								"$match": bson.M{"$and": where},
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pipeline = append(pipeline,
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$project": bson.M{
 | 
				
			||||||
 | 
									"_id": 0,
 | 
				
			||||||
 | 
									"msg": "$msgs.msg",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$count": "count",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return 0, nil, err
 | 
							return 0, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	msgs := make([]*relation.MsgInfoModel, 0)
 | 
						if len(count) == 0 || count[0] == 0 {
 | 
				
			||||||
	for _, doc := range msgsDocs {
 | 
							return 0, nil, nil
 | 
				
			||||||
		msgInfo := doc.Msg
 | 
						}
 | 
				
			||||||
 | 
						pipeline = pipeline[:len(pipeline)-1]
 | 
				
			||||||
 | 
						pipeline = append(pipeline,
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							bson.M{
 | 
				
			||||||
 | 
								"$limit": req.Pagination.GetShowNumber(),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						msgs, err := mongoutil.Aggregate[*relation.MsgInfoModel](ctx, m.coll, pipeline)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return 0, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for i := range msgs {
 | 
				
			||||||
 | 
							msgInfo := msgs[i]
 | 
				
			||||||
		if msgInfo == nil || msgInfo.Msg == nil {
 | 
							if msgInfo == nil || msgInfo.Msg == nil {
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@ -350,17 +372,17 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		msgs = append(msgs, msgInfo)
 | 
							msgs = append(msgs, msgInfo)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
 | 
						//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
 | 
				
			||||||
	n := int32(len(msgs))
 | 
						//n := int32(len(msgs))
 | 
				
			||||||
	if start >= n {
 | 
						//if start >= n {
 | 
				
			||||||
		return n, []*relation.MsgInfoModel{}, nil
 | 
						//	return n, []*relation.MsgInfoModel{}, nil
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
	if start+req.Pagination.ShowNumber < n {
 | 
						//if start+req.Pagination.ShowNumber < n {
 | 
				
			||||||
		msgs = msgs[start : start+req.Pagination.ShowNumber]
 | 
						//	msgs = msgs[start : start+req.Pagination.ShowNumber]
 | 
				
			||||||
	} else {
 | 
						//} else {
 | 
				
			||||||
		msgs = msgs[start:]
 | 
						//	msgs = msgs[start:]
 | 
				
			||||||
	}
 | 
						//}
 | 
				
			||||||
	return n, msgs, nil
 | 
						return count[0], msgs, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) {
 | 
					func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) {
 | 
				
			||||||
 | 
				
			|||||||
@ -66,6 +66,7 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka,
 | 
				
			|||||||
		kafkaConfig     = &config.Kafka{}
 | 
							kafkaConfig     = &config.Kafka{}
 | 
				
			||||||
		minioConfig     = &config.Minio{}
 | 
							minioConfig     = &config.Minio{}
 | 
				
			||||||
		zookeeperConfig = &config.ZooKeeper{}
 | 
							zookeeperConfig = &config.ZooKeeper{}
 | 
				
			||||||
 | 
							thirdConfig     = &config.Third{}
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig)
 | 
						err := config.LoadConfig(filepath.Join(configDir, cmd.MongodbConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], mongoConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@ -82,11 +83,19 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka,
 | 
				
			|||||||
		return nil, nil, nil, nil, nil, err
 | 
							return nil, nil, nil, nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = config.LoadConfig(filepath.Join(configDir, cmd.MinioConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MinioConfigFileName], minioConfig)
 | 
						err = config.LoadConfig(filepath.Join(configDir, cmd.OpenIMRPCThirdCfgFileName), cmd.ConfigEnvPrefixMap[cmd.OpenIMRPCThirdCfgFileName], thirdConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, nil, nil, nil, err
 | 
							return nil, nil, nil, nil, nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if thirdConfig.Object.Enable == "minio" {
 | 
				
			||||||
 | 
							err = config.LoadConfig(filepath.Join(configDir, cmd.MinioConfigFileName), cmd.ConfigEnvPrefixMap[cmd.MinioConfigFileName], minioConfig)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return nil, nil, nil, nil, nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							minioConfig = nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig)
 | 
						err = config.LoadConfig(filepath.Join(configDir, cmd.ZookeeperConfigFileName), cmd.ConfigEnvPrefixMap[cmd.ZookeeperConfigFileName], zookeeperConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, nil, nil, nil, err
 | 
							return nil, nil, nil, nil, nil, err
 | 
				
			||||||
@ -131,14 +140,17 @@ func performChecks(ctx context.Context, mongoConfig *config.Mongo, redisConfig *
 | 
				
			|||||||
		"Redis": func() error {
 | 
							"Redis": func() error {
 | 
				
			||||||
			return CheckRedis(ctx, redisConfig)
 | 
								return CheckRedis(ctx, redisConfig)
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		"MinIO": func() error {
 | 
					 | 
				
			||||||
			return CheckMinIO(ctx, minioConfig)
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		"Kafka": func() error {
 | 
							"Kafka": func() error {
 | 
				
			||||||
			return CheckKafka(ctx, kafkaConfig)
 | 
								return CheckKafka(ctx, kafkaConfig)
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if minioConfig != nil {
 | 
				
			||||||
 | 
							checks["MinIO"] = func() error {
 | 
				
			||||||
 | 
								return CheckMinIO(ctx, minioConfig)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for i := 0; i < maxRetry; i++ {
 | 
						for i := 0; i < maxRetry; i++ {
 | 
				
			||||||
		allSuccess := true
 | 
							allSuccess := true
 | 
				
			||||||
		for name, check := range checks {
 | 
							for name, check := range checks {
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user