mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 19:32:17 +08:00 
			
		
		
		
	refactor: delete message and message batch use lua.
This commit is contained in:
		
							parent
							
								
									5d63d6d6d8
								
							
						
					
					
						commit
						268b6136e7
					
				@ -1,7 +1,6 @@
 | 
			
		||||
address: [ localhost:16379 ]
 | 
			
		||||
username: ''
 | 
			
		||||
password: openIM123
 | 
			
		||||
enablePipeline: false
 | 
			
		||||
clusterMode: false
 | 
			
		||||
db: 0
 | 
			
		||||
maxRetry: 10
 | 
			
		||||
							
								
								
									
										7
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								go.mod
									
									
									
									
									
								
							@ -1,13 +1,13 @@
 | 
			
		||||
module github.com/openimsdk/open-im-server/v3
 | 
			
		||||
 | 
			
		||||
go 1.20
 | 
			
		||||
go 1.21.2
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	firebase.google.com/go v3.13.0+incompatible
 | 
			
		||||
	github.com/dtm-labs/rockscache v0.1.1
 | 
			
		||||
	github.com/gin-gonic/gin v1.9.1
 | 
			
		||||
	github.com/go-playground/validator/v10 v10.18.0
 | 
			
		||||
	github.com/gogo/protobuf v1.3.2
 | 
			
		||||
	github.com/gogo/protobuf v1.3.2 // indirect
 | 
			
		||||
	github.com/golang-jwt/jwt/v4 v4.5.0
 | 
			
		||||
	github.com/gorilla/websocket v1.5.1
 | 
			
		||||
	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 | 
			
		||||
@ -31,6 +31,7 @@ require (
 | 
			
		||||
	github.com/IBM/sarama v1.43.0
 | 
			
		||||
	github.com/fatih/color v1.14.1
 | 
			
		||||
	github.com/go-redis/redis v6.15.9+incompatible
 | 
			
		||||
	github.com/go-redis/redismock/v9 v9.2.0
 | 
			
		||||
	github.com/hashicorp/golang-lru/v2 v2.0.7
 | 
			
		||||
	github.com/kelindar/bitmap v1.5.2
 | 
			
		||||
	github.com/likexian/gokit v0.25.13
 | 
			
		||||
@ -112,8 +113,6 @@ require (
 | 
			
		||||
	github.com/modern-go/reflect2 v1.0.2 // indirect
 | 
			
		||||
	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
 | 
			
		||||
	github.com/mozillazg/go-httpheader v0.4.0 // indirect
 | 
			
		||||
	github.com/onsi/ginkgo v1.16.5 // indirect
 | 
			
		||||
	github.com/onsi/gomega v1.18.1 // indirect
 | 
			
		||||
	github.com/pelletier/go-toml/v2 v2.1.0 // indirect
 | 
			
		||||
	github.com/pierrec/lz4/v4 v4.1.21 // indirect
 | 
			
		||||
	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										42
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								go.sum
									
									
									
									
									
								
							@ -38,9 +38,6 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
 | 
			
		||||
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
 | 
			
		||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
 | 
			
		||||
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
 | 
			
		||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 | 
			
		||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 | 
			
		||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
 | 
			
		||||
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
 | 
			
		||||
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
 | 
			
		||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 | 
			
		||||
@ -83,8 +80,6 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 | 
			
		||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 | 
			
		||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
 | 
			
		||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
 | 
			
		||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 | 
			
		||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
 | 
			
		||||
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
 | 
			
		||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
 | 
			
		||||
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
 | 
			
		||||
@ -110,7 +105,8 @@ github.com/go-playground/validator/v10 v10.18.0 h1:BvolUXjp4zuvkZ5YN5t7ebzbhlUtP
 | 
			
		||||
github.com/go-playground/validator/v10 v10.18.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
 | 
			
		||||
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
 | 
			
		||||
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
 | 
			
		||||
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 | 
			
		||||
github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
 | 
			
		||||
github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0=
 | 
			
		||||
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
 | 
			
		||||
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 | 
			
		||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
 | 
			
		||||
@ -133,7 +129,6 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
 | 
			
		||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
 | 
			
		||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 | 
			
		||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 | 
			
		||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 | 
			
		||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 | 
			
		||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 | 
			
		||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 | 
			
		||||
@ -157,7 +152,6 @@ github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17
 | 
			
		||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 | 
			
		||||
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
 | 
			
		||||
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
 | 
			
		||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 | 
			
		||||
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
 | 
			
		||||
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
 | 
			
		||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
			
		||||
@ -186,8 +180,6 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs
 | 
			
		||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 | 
			
		||||
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 | 
			
		||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 | 
			
		||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 | 
			
		||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 | 
			
		||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
 | 
			
		||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
 | 
			
		||||
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
 | 
			
		||||
@ -270,20 +262,12 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
 | 
			
		||||
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
 | 
			
		||||
github.com/mozillazg/go-httpheader v0.4.0 h1:aBn6aRXtFzyDLZ4VIRLsZbbJloagQfMnCiYgOq6hK4w=
 | 
			
		||||
github.com/mozillazg/go-httpheader v0.4.0/go.mod h1:PuT8h0pw6efvp8ZeUec1Rs7dwjK08bt6gKSReGMqtdA=
 | 
			
		||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
 | 
			
		||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
 | 
			
		||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
 | 
			
		||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 | 
			
		||||
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
 | 
			
		||||
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
 | 
			
		||||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
 | 
			
		||||
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
 | 
			
		||||
github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
 | 
			
		||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 | 
			
		||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 | 
			
		||||
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 | 
			
		||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
 | 
			
		||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
 | 
			
		||||
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 | 
			
		||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0=
 | 
			
		||||
github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 | 
			
		||||
github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc=
 | 
			
		||||
@ -348,7 +332,6 @@ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
 | 
			
		||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
 | 
			
		||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 | 
			
		||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 | 
			
		||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 | 
			
		||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 | 
			
		||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 | 
			
		||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 | 
			
		||||
@ -438,18 +421,15 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 | 
			
		||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
 | 
			
		||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 | 
			
		||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 | 
			
		||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 | 
			
		||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 | 
			
		||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 | 
			
		||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 | 
			
		||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 | 
			
		||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 | 
			
		||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 | 
			
		||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 | 
			
		||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 | 
			
		||||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 | 
			
		||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 | 
			
		||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
 | 
			
		||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 | 
			
		||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 | 
			
		||||
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 | 
			
		||||
@ -467,21 +447,12 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
 | 
			
		||||
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
 | 
			
		||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 | 
			
		||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 | 
			
		||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
@ -494,7 +465,6 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
 | 
			
		||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
 | 
			
		||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 | 
			
		||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 | 
			
		||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 | 
			
		||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 | 
			
		||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
 | 
			
		||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 | 
			
		||||
@ -509,7 +479,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
 | 
			
		||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
 | 
			
		||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 | 
			
		||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 | 
			
		||||
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 | 
			
		||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 | 
			
		||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
 | 
			
		||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 | 
			
		||||
@ -556,14 +525,11 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
 | 
			
		||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 | 
			
		||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 | 
			
		||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 | 
			
		||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 | 
			
		||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 | 
			
		||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 | 
			
		||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
 | 
			
		||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 | 
			
		||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 | 
			
		||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 | 
			
		||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 | 
			
		||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 | 
			
		||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
			
		||||
 | 
			
		||||
@ -81,7 +81,7 @@ func Start(ctx context.Context, index int, config *Config) error {
 | 
			
		||||
	}
 | 
			
		||||
	client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
 | 
			
		||||
		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
 | 
			
		||||
	msgModel := redis.NewMsgCache(rdb, config.RedisConfig.EnablePipeline)
 | 
			
		||||
	msgModel := redis.NewMsgCache(rdb)
 | 
			
		||||
	seqModel := redis.NewSeqCache(rdb)
 | 
			
		||||
	msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 | 
			
		||||
@ -89,7 +89,6 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
 | 
			
		||||
			msgFromMQ.ConversationID,
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
	mc.msgDatabase.DelUserDeleteMsgsList(ctx, msgFromMQ.ConversationID, seqs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
 | 
			
		||||
 | 
			
		||||
@ -85,8 +85,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	//todo MsgCacheTimeout
 | 
			
		||||
	msgModel := redis.NewMsgCache(rdb, config.RedisConfig.EnablePipeline)
 | 
			
		||||
	msgModel := redis.NewMsgCache(rdb)
 | 
			
		||||
	seqModel := redis.NewSeqCache(rdb)
 | 
			
		||||
	conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
 | 
			
		||||
	userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
 | 
			
		||||
 | 
			
		||||
@ -323,13 +323,12 @@ type User struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Redis struct {
 | 
			
		||||
	Address        []string `mapstructure:"address"`
 | 
			
		||||
	Username       string   `mapstructure:"username"`
 | 
			
		||||
	Password       string   `mapstructure:"password"`
 | 
			
		||||
	EnablePipeline bool     `mapstructure:"enablePipeline"`
 | 
			
		||||
	ClusterMode    bool     `mapstructure:"clusterMode"`
 | 
			
		||||
	DB             int      `mapstructure:"storage"`
 | 
			
		||||
	MaxRetry       int      `mapstructure:"MaxRetry"`
 | 
			
		||||
	Address     []string `mapstructure:"address"`
 | 
			
		||||
	Username    string   `mapstructure:"username"`
 | 
			
		||||
	Password    string   `mapstructure:"password"`
 | 
			
		||||
	ClusterMode bool     `mapstructure:"clusterMode"`
 | 
			
		||||
	DB          int      `mapstructure:"storage"`
 | 
			
		||||
	MaxRetry    int      `mapstructure:"MaxRetry"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type BeforeConfig struct {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								pkg/common/storage/cache/cachekey/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								pkg/common/storage/cache/cachekey/msg.go
									
									
									
									
										vendored
									
									
								
							@ -31,10 +31,6 @@ const (
 | 
			
		||||
	reactionNotification = "EX_NOTIFICATION_"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func GetAllMessageCacheKey(conversationID string) string {
 | 
			
		||||
	return messageCache + conversationID + "_*"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetMessageCacheKey(conversationID string, seq int64) string {
 | 
			
		||||
	return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										3
									
								
								pkg/common/storage/cache/conversation.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								pkg/common/storage/cache/conversation.go
									
									
									
									
										vendored
									
									
								
							@ -52,9 +52,6 @@ type ConversationCache interface {
 | 
			
		||||
	// GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error)
 | 
			
		||||
	DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache
 | 
			
		||||
 | 
			
		||||
	GetConversationsByConversationID(ctx context.Context,
 | 
			
		||||
		conversationIDs []string) ([]*relationtb.Conversation, error)
 | 
			
		||||
	DelConversationByConversationID(conversationIDs ...string) ConversationCache
 | 
			
		||||
	GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
 | 
			
		||||
	DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										9
									
								
								pkg/common/storage/cache/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										9
									
								
								pkg/common/storage/cache/msg.go
									
									
									
									
										vendored
									
									
								
							@ -23,13 +23,8 @@ import (
 | 
			
		||||
 | 
			
		||||
type MsgCache interface {
 | 
			
		||||
	GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
 | 
			
		||||
	SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
 | 
			
		||||
	UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error
 | 
			
		||||
	DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
 | 
			
		||||
	DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error
 | 
			
		||||
	GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error)
 | 
			
		||||
	CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error
 | 
			
		||||
	DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error
 | 
			
		||||
	SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
 | 
			
		||||
	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
 | 
			
		||||
	SetSendMsgStatus(ctx context.Context, id string, status int32) error
 | 
			
		||||
	GetSendMsgStatus(ctx context.Context, id string) (int32, error)
 | 
			
		||||
	JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										43
									
								
								pkg/common/storage/cache/redis/batch_handler.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										43
									
								
								pkg/common/storage/cache/redis/batch_handler.go
									
									
									
									
										vendored
									
									
								
							@ -62,17 +62,13 @@ func (c *BatchDeleterRedis) ChainExecDel(ctx context.Context) error {
 | 
			
		||||
func (c *BatchDeleterRedis) execDel(ctx context.Context, keys []string) error {
 | 
			
		||||
	if len(keys) > 0 {
 | 
			
		||||
		log.ZDebug(ctx, "delete cache", "topic", c.redisPubTopics, "keys", keys)
 | 
			
		||||
		slotMapKeys, err := groupKeysBySlot(ctx, c.redisClient, keys)
 | 
			
		||||
		// Batch delete keys
 | 
			
		||||
		err := ProcessKeysBySlot(ctx, c.redisClient, keys, func(ctx context.Context, slot int64, keys []string) error {
 | 
			
		||||
			return c.rocksClient.TagAsDeletedBatch2(ctx, keys)
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		// Batch delete keys
 | 
			
		||||
		for slot, singleSlotKeys := range slotMapKeys {
 | 
			
		||||
			if err := c.rocksClient.TagAsDeletedBatch2(ctx, singleSlotKeys); err != nil {
 | 
			
		||||
				log.ZWarn(ctx, "Batch delete cache failed", err, "slot", slot, "keys", singleSlotKeys)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// Publish the keys that have been deleted to Redis to update the local cache information of other nodes
 | 
			
		||||
		if len(c.redisPubTopics) > 0 && len(keys) > 0 {
 | 
			
		||||
			keysByTopic := localcache.GetPublishKeysByTopic(c.redisPubTopics, keys)
 | 
			
		||||
@ -117,37 +113,6 @@ func GetRocksCacheOptions() *rockscache.Options {
 | 
			
		||||
	return &opts
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// groupKeysBySlot groups keys by their Redis cluster hash slots.
 | 
			
		||||
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
 | 
			
		||||
	slots := make(map[int64][]string)
 | 
			
		||||
	clusterClient, isCluster := redisClient.(*redis.ClusterClient)
 | 
			
		||||
	if isCluster {
 | 
			
		||||
		pipe := clusterClient.Pipeline()
 | 
			
		||||
		cmds := make([]*redis.IntCmd, len(keys))
 | 
			
		||||
		for i, key := range keys {
 | 
			
		||||
			cmds[i] = pipe.ClusterKeySlot(ctx, key)
 | 
			
		||||
		}
 | 
			
		||||
		_, err := pipe.Exec(ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, errs.WrapMsg(err, "get slot err")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for i, cmd := range cmds {
 | 
			
		||||
			slot, err := cmd.Result()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.ZWarn(ctx, "some key get slot err", err, "key", keys[i])
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			slots[slot] = append(slots[slot], keys[i])
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// If not a cluster client, put all keys in the same slot (0)
 | 
			
		||||
		slots[0] = keys
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return slots, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
 | 
			
		||||
	var t T
 | 
			
		||||
	var write bool
 | 
			
		||||
 | 
			
		||||
@ -222,14 +222,6 @@ func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conve
 | 
			
		||||
	return cache
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConversationRedisCache) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
 | 
			
		||||
	panic("implement me")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConversationRedisCache) DelConversationByConversationID(conversationIDs ...string) cache.ConversationCache {
 | 
			
		||||
	panic("implement me")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConversationRedisCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) {
 | 
			
		||||
	return getCache(ctx, c.rcClient, c.getConversationNotReceiveMessageUserIDsKey(conversationID), c.expireTime, func(ctx context.Context) ([]string, error) {
 | 
			
		||||
		return c.conversationDB.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										102
									
								
								pkg/common/storage/cache/redis/lua_script.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								pkg/common/storage/cache/redis/lua_script.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1,102 @@
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	setBatchWithCommonExpireScript = redis.NewScript(`
 | 
			
		||||
local expire = tonumber(ARGV[1])
 | 
			
		||||
for i, key in ipairs(KEYS) do
 | 
			
		||||
    redis.call('SET', key, ARGV[i + 1])
 | 
			
		||||
    redis.call('EXPIRE', key, expire)
 | 
			
		||||
end
 | 
			
		||||
return #KEYS
 | 
			
		||||
`)
 | 
			
		||||
 | 
			
		||||
	setBatchWithIndividualExpireScript = redis.NewScript(`
 | 
			
		||||
local n = #KEYS
 | 
			
		||||
for i = 1, n do
 | 
			
		||||
    redis.call('SET', KEYS[i], ARGV[i])
 | 
			
		||||
    redis.call('EXPIRE', KEYS[i], ARGV[i + n])
 | 
			
		||||
end
 | 
			
		||||
return n
 | 
			
		||||
`)
 | 
			
		||||
 | 
			
		||||
	deleteBatchScript = redis.NewScript(`
 | 
			
		||||
for i, key in ipairs(KEYS) do
 | 
			
		||||
    redis.call('DEL', key)
 | 
			
		||||
end
 | 
			
		||||
return #KEYS
 | 
			
		||||
`)
 | 
			
		||||
 | 
			
		||||
	getBatchScript = redis.NewScript(`
 | 
			
		||||
local values = {}
 | 
			
		||||
for i, key in ipairs(KEYS) do
 | 
			
		||||
    local value = redis.call('GET', key)
 | 
			
		||||
    table.insert(values, value)
 | 
			
		||||
end
 | 
			
		||||
return values
 | 
			
		||||
`)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func callLua(ctx context.Context, rdb redis.Scripter, script *redis.Script, keys []string, args []any) (any, error) {
 | 
			
		||||
	log.ZDebug(ctx, "callLua args", "scriptHash", script.Hash(), "keys", keys, "args", args)
 | 
			
		||||
	r := script.EvalSha(ctx, rdb, keys, args)
 | 
			
		||||
	if redis.HasErrorPrefix(r.Err(), "NOSCRIPT") {
 | 
			
		||||
		if err := script.Load(ctx, rdb).Err(); err != nil {
 | 
			
		||||
			r = script.Eval(ctx, rdb, keys, args)
 | 
			
		||||
			r = script.EvalSha(ctx, rdb, keys, args)
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	v, err := r.Result()
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
		err = nil
 | 
			
		||||
	}
 | 
			
		||||
	return v, errs.WrapMsg(err, "call lua err", "scriptHash", script.Hash(), "keys", keys, "args", args)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func LuaSetBatchWithCommonExpire(ctx context.Context, rdb redis.Scripter, keys []string, values []string, expire int) error {
 | 
			
		||||
	var vals = make([]any, 0, 1+len(values))
 | 
			
		||||
	vals = append(vals, expire)
 | 
			
		||||
	for _, v := range values {
 | 
			
		||||
		vals = append(vals, v)
 | 
			
		||||
	}
 | 
			
		||||
	_, err := callLua(ctx, rdb, setBatchWithCommonExpireScript, keys, vals)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func LuaSetBatchWithIndividualExpire(ctx context.Context, rdb redis.Scripter, keys []string, values []string, expires []int) error {
 | 
			
		||||
	var vals = make([]any, 0, len(values)+len(expires))
 | 
			
		||||
	for _, v := range values {
 | 
			
		||||
		vals = append(vals, v)
 | 
			
		||||
	}
 | 
			
		||||
	for _, ex := range expires {
 | 
			
		||||
		vals = append(vals, ex)
 | 
			
		||||
	}
 | 
			
		||||
	_, err := callLua(ctx, rdb, setBatchWithIndividualExpireScript, keys, vals)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func LuaDeleteBatch(ctx context.Context, rdb redis.Scripter, keys []string) error {
 | 
			
		||||
	_, err := callLua(ctx, rdb, deleteBatchScript, keys, nil)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func LuaGetBatch(ctx context.Context, rdb redis.Scripter, keys []string) ([]any, error) {
 | 
			
		||||
	v, err := callLua(ctx, rdb, getBatchScript, keys, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	values, ok := v.([]any)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, servererrs.ErrArgs.WrapMsg("invalid lua get batch result")
 | 
			
		||||
	}
 | 
			
		||||
	return values, nil
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										75
									
								
								pkg/common/storage/cache/redis/lua_script_test.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								pkg/common/storage/cache/redis/lua_script_test.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1,75 @@
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/go-redis/redismock/v9"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	"github.com/stretchr/testify/require"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestLuaSetBatchWithCommonExpire(t *testing.T) {
 | 
			
		||||
	rdb, mock := redismock.NewClientMock()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	keys := []string{"key1", "key2"}
 | 
			
		||||
	values := []string{"value1", "value2"}
 | 
			
		||||
	expire := 10
 | 
			
		||||
 | 
			
		||||
	mock.ExpectEvalSha(setBatchWithCommonExpireScript.Hash(), keys, []any{expire, "value1", "value2"}).SetVal(int64(len(keys)))
 | 
			
		||||
 | 
			
		||||
	err := LuaSetBatchWithCommonExpire(ctx, rdb, keys, values, expire)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.NoError(t, mock.ExpectationsWereMet())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestLuaSetBatchWithIndividualExpire(t *testing.T) {
 | 
			
		||||
	rdb, mock := redismock.NewClientMock()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	keys := []string{"key1", "key2"}
 | 
			
		||||
	values := []string{"value1", "value2"}
 | 
			
		||||
	expires := []int{10, 20}
 | 
			
		||||
 | 
			
		||||
	args := make([]any, 0, len(values)+len(expires))
 | 
			
		||||
	for _, v := range values {
 | 
			
		||||
		args = append(args, v)
 | 
			
		||||
	}
 | 
			
		||||
	for _, ex := range expires {
 | 
			
		||||
		args = append(args, ex)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mock.ExpectEvalSha(setBatchWithIndividualExpireScript.Hash(), keys, args).SetVal(int64(len(keys)))
 | 
			
		||||
 | 
			
		||||
	err := LuaSetBatchWithIndividualExpire(ctx, rdb, keys, values, expires)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.NoError(t, mock.ExpectationsWereMet())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestLuaDeleteBatch(t *testing.T) {
 | 
			
		||||
	rdb, mock := redismock.NewClientMock()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	keys := []string{"key1", "key2"}
 | 
			
		||||
 | 
			
		||||
	mock.ExpectEvalSha(deleteBatchScript.Hash(), keys, []any{}).SetVal(int64(len(keys)))
 | 
			
		||||
 | 
			
		||||
	err := LuaDeleteBatch(ctx, rdb, keys)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.NoError(t, mock.ExpectationsWereMet())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestLuaGetBatch(t *testing.T) {
 | 
			
		||||
	rdb, mock := redismock.NewClientMock()
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
 | 
			
		||||
	keys := []string{"key1", "key2"}
 | 
			
		||||
	expectedValues := []any{"value1", "value2"}
 | 
			
		||||
 | 
			
		||||
	mock.ExpectEvalSha(getBatchScript.Hash(), keys, []any{}).SetVal(expectedValues)
 | 
			
		||||
 | 
			
		||||
	values, err := LuaGetBatch(ctx, rdb, keys)
 | 
			
		||||
	require.NoError(t, err)
 | 
			
		||||
	assert.NoError(t, mock.ExpectationsWereMet())
 | 
			
		||||
	assert.Equal(t, expectedValues, values)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										15
									
								
								pkg/common/storage/cache/redis/meta_cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										15
									
								
								pkg/common/storage/cache/redis/meta_cache.go
									
									
									
									
										vendored
									
									
								
							@ -1,15 +0,0 @@
 | 
			
		||||
// Copyright © 2023 OpenIM. All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package redis
 | 
			
		||||
							
								
								
									
										374
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										374
									
								
								pkg/common/storage/cache/redis/msg.go
									
									
									
									
										vendored
									
									
								
							@ -16,37 +16,25 @@ package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"github.com/gogo/protobuf/jsonpb"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/stringutil"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
) //
 | 
			
		||||
 | 
			
		||||
const msgCacheTimeout = 86400 * time.Second
 | 
			
		||||
// msgCacheTimeout is  expiration time of message cache, 86400 seconds
 | 
			
		||||
const msgCacheTimeout = 86400
 | 
			
		||||
 | 
			
		||||
var concurrentLimit = 3
 | 
			
		||||
 | 
			
		||||
func NewMsgCache(client redis.UniversalClient, redisEnablePipeline bool) cache.MsgCache {
 | 
			
		||||
	return &msgCache{rdb: client, msgCacheTimeout: msgCacheTimeout, redisEnablePipeline: redisEnablePipeline}
 | 
			
		||||
func NewMsgCache(client redis.UniversalClient) cache.MsgCache {
 | 
			
		||||
	return &msgCache{rdb: client}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type msgCache struct {
 | 
			
		||||
	rdb                 redis.UniversalClient
 | 
			
		||||
	msgCacheTimeout     time.Duration
 | 
			
		||||
	redisEnablePipeline bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) getAllMessageCacheKey(conversationID string) string {
 | 
			
		||||
	return cachekey.GetAllMessageCacheKey(conversationID)
 | 
			
		||||
	rdb redis.UniversalClient
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
 | 
			
		||||
@ -72,218 +60,41 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in
 | 
			
		||||
	return cachekey.GetMessageReactionExKey(clientMsgID, sessionType)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
 | 
			
		||||
	if c.redisEnablePipeline {
 | 
			
		||||
		return c.PipeSetMessageToCache(ctx, conversationID, msgs)
 | 
			
		||||
	}
 | 
			
		||||
	return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
 | 
			
		||||
	pipe := c.rdb.Pipeline()
 | 
			
		||||
	for _, msg := range msgs {
 | 
			
		||||
		s, err := msgprocessor.Pb2String(msg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return 0, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		key := c.getMessageCacheKey(conversationID, msg.Seq)
 | 
			
		||||
		_ = pipe.Set(ctx, key, s, c.msgCacheTimeout)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	results, err := pipe.Exec(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, res := range results {
 | 
			
		||||
		if res.Err() != nil {
 | 
			
		||||
			return 0, errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return len(msgs), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
 | 
			
		||||
	wg := errgroup.Group{}
 | 
			
		||||
	wg.SetLimit(concurrentLimit)
 | 
			
		||||
 | 
			
		||||
	for _, msg := range msgs {
 | 
			
		||||
		msg := msg // closure safe var
 | 
			
		||||
		wg.Go(func() error {
 | 
			
		||||
			s, err := msgprocessor.Pb2String(msg)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errs.Wrap(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			key := c.getMessageCacheKey(conversationID, msg.Seq)
 | 
			
		||||
			if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
 | 
			
		||||
				return errs.Wrap(err)
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err := wg.Wait()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, errs.WrapMsg(err, "wg.Wait failed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return len(msgs), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error {
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		delUserListKey := c.getMessageDelUserListKey(conversationID, seq)
 | 
			
		||||
		userDelListKey := c.getUserDelList(conversationID, userID)
 | 
			
		||||
		err := c.rdb.SAdd(ctx, delUserListKey, userID).Err()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		err = c.rdb.SAdd(ctx, userDelListKey, seq).Err()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		if err := c.rdb.Expire(ctx, delUserListKey, c.msgCacheTimeout).Err(); err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		if err := c.rdb.Expire(ctx, userDelListKey, c.msgCacheTimeout).Err(); err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) {
 | 
			
		||||
	result, err := c.rdb.SMembers(ctx, c.getUserDelList(conversationID, userID)).Result()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	seqs = make([]int64, len(result))
 | 
			
		||||
	for i, v := range result {
 | 
			
		||||
		seqs[i] = stringutil.StringToInt64(v)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return seqs, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
 | 
			
		||||
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if len(delUsers) > 0 {
 | 
			
		||||
			var failedFlag bool
 | 
			
		||||
			for _, userID := range delUsers {
 | 
			
		||||
				err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err()
 | 
			
		||||
func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
 | 
			
		||||
	msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
 | 
			
		||||
		return c.getMessageCacheKey(conversationID, msg.Seq)
 | 
			
		||||
	})
 | 
			
		||||
	keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
 | 
			
		||||
		return c.getMessageCacheKey(conversationID, msg.Seq)
 | 
			
		||||
	})
 | 
			
		||||
	err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
 | 
			
		||||
		var values []string
 | 
			
		||||
		for _, key := range keys {
 | 
			
		||||
			if msg, ok := msgMap[key]; ok {
 | 
			
		||||
				s, err := msgprocessor.Pb2String(msg)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					failedFlag = true
 | 
			
		||||
					log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if !failedFlag {
 | 
			
		||||
				if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil {
 | 
			
		||||
					log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq)
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				values = append(values, s)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
 | 
			
		||||
	if c.redisEnablePipeline {
 | 
			
		||||
		return c.PipeDeleteMessages(ctx, conversationID, seqs)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return c.ParallelDeleteMessages(ctx, conversationID, seqs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) ParallelDeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
 | 
			
		||||
	wg := errgroup.Group{}
 | 
			
		||||
	wg.SetLimit(concurrentLimit)
 | 
			
		||||
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		seq := seq
 | 
			
		||||
		wg.Go(func() error {
 | 
			
		||||
			err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return errs.Wrap(err)
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) PipeDeleteMessages(ctx context.Context, conversationID string, seqs []int64) error {
 | 
			
		||||
	pipe := c.rdb.Pipeline()
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		_ = pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	results, err := pipe.Exec(ctx)
 | 
			
		||||
		return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout)
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.WrapMsg(err, "pipe.del")
 | 
			
		||||
		return 0, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, res := range results {
 | 
			
		||||
		if res.Err() != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
	return len(msgs), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error {
 | 
			
		||||
	vals, err := c.rdb.Keys(ctx, c.getAllMessageCacheKey(conversationID)).Result()
 | 
			
		||||
	if errors.Is(err, redis.Nil) {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errs.Wrap(err)
 | 
			
		||||
	}
 | 
			
		||||
	for _, v := range vals {
 | 
			
		||||
		if err := c.rdb.Del(ctx, v).Err(); err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
 | 
			
		||||
func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
 | 
			
		||||
	var keys []string
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		key := c.getMessageCacheKey(userID, seq)
 | 
			
		||||
		result, err := c.rdb.Get(ctx, key).Result()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if errors.Is(err, redis.Nil) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		var msg sdkws.MsgData
 | 
			
		||||
		err = jsonpb.UnmarshalString(result, &msg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		msg.Status = constant.MsgDeleted
 | 
			
		||||
		s, err := msgprocessor.Pb2String(&msg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		if err := c.rdb.Set(ctx, key, s, c.msgCacheTimeout).Err(); err != nil {
 | 
			
		||||
			return errs.Wrap(err)
 | 
			
		||||
		}
 | 
			
		||||
		keys = append(keys, c.getMessageCacheKey(conversationID, seq))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
	return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
 | 
			
		||||
		return LuaDeleteBatch(ctx, c.rdb, keys)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
 | 
			
		||||
@ -338,102 +149,39 @@ func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
 | 
			
		||||
	if c.redisEnablePipeline {
 | 
			
		||||
		return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
 | 
			
		||||
	pipe := c.rdb.Pipeline()
 | 
			
		||||
 | 
			
		||||
	results := []*redis.StringCmd{}
 | 
			
		||||
	var keys []string
 | 
			
		||||
	keySeqMap := make(map[string]int64, 10)
 | 
			
		||||
	for _, seq := range seqs {
 | 
			
		||||
		results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
 | 
			
		||||
		key := c.getMessageCacheKey(conversationID, seq)
 | 
			
		||||
		keys = append(keys, key)
 | 
			
		||||
		keySeqMap[key] = seq
 | 
			
		||||
	}
 | 
			
		||||
	err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
 | 
			
		||||
		result, err := LuaGetBatch(ctx, c.rdb, keys)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		for i, value := range result {
 | 
			
		||||
			seq := keySeqMap[keys[i]]
 | 
			
		||||
			if value == nil {
 | 
			
		||||
				failedSeqs = append(failedSeqs, seq)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
	_, err = pipe.Exec(ctx)
 | 
			
		||||
	if err != nil && err != redis.Nil {
 | 
			
		||||
		return seqMsgs, failedSeqs, errs.WrapMsg(err, "pipe.get")
 | 
			
		||||
			msg := &sdkws.MsgData{}
 | 
			
		||||
			msgString, ok := value.(string)
 | 
			
		||||
			if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
 | 
			
		||||
				failedSeqs = append(failedSeqs, seq)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			seqMsgs = append(seqMsgs, msg)
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return seqMsgs, failedSeqs, nil
 | 
			
		||||
 | 
			
		||||
	for idx, res := range results {
 | 
			
		||||
		seq := seqs[idx]
 | 
			
		||||
		if res.Err() != nil {
 | 
			
		||||
			log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
 | 
			
		||||
			failedSeqs = append(failedSeqs, seq)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		msg := sdkws.MsgData{}
 | 
			
		||||
		if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
 | 
			
		||||
			log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
 | 
			
		||||
			failedSeqs = append(failedSeqs, seq)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if msg.Status == constant.MsgDeleted {
 | 
			
		||||
			failedSeqs = append(failedSeqs, seq)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		seqMsgs = append(seqMsgs, &msg)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
 | 
			
		||||
	type entry struct {
 | 
			
		||||
		err error
 | 
			
		||||
		msg *sdkws.MsgData
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg := errgroup.Group{}
 | 
			
		||||
	wg.SetLimit(concurrentLimit)
 | 
			
		||||
 | 
			
		||||
	results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
 | 
			
		||||
	for idx, seq := range seqs {
 | 
			
		||||
		// closure safe var
 | 
			
		||||
		idx := idx
 | 
			
		||||
		seq := seq
 | 
			
		||||
 | 
			
		||||
		wg.Go(func() error {
 | 
			
		||||
			res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
 | 
			
		||||
				results[idx] = entry{err: err}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			msg := sdkws.MsgData{}
 | 
			
		||||
			if err = msgprocessor.String2Pb(res, &msg); err != nil {
 | 
			
		||||
				log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
 | 
			
		||||
				results[idx] = entry{err: err}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if msg.Status == constant.MsgDeleted {
 | 
			
		||||
				results[idx] = entry{err: err}
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			results[idx] = entry{msg: &msg}
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_ = wg.Wait()
 | 
			
		||||
 | 
			
		||||
	for idx, res := range results {
 | 
			
		||||
		if res.err != nil {
 | 
			
		||||
			failedSeqs = append(failedSeqs, seqs[idx])
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		seqMsgs = append(seqMsgs, res.msg)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										452
									
								
								pkg/common/storage/cache/redis/msg_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										452
									
								
								pkg/common/storage/cache/redis/msg_test.go
									
									
									
									
										vendored
									
									
								
							@ -4,14 +4,13 @@
 | 
			
		||||
// you may not use this file except in compliance with the License.
 | 
			
		||||
// You may obtain a copy of the License at
 | 
			
		||||
//
 | 
			
		||||
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//	http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
//
 | 
			
		||||
// Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
// See the License for the specific language governing permissions and
 | 
			
		||||
// limitations under the License.
 | 
			
		||||
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
@ -20,381 +19,106 @@ import (
 | 
			
		||||
	"github.com/openimsdk/protocol/sdkws"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestParallelSetMessageToCache(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid      = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst = rand.Int63()
 | 
			
		||||
		msgs     = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq: seqFirst + int64(i),
 | 
			
		||||
func Test_msgCache_SetMessagesToCache(t *testing.T) {
 | 
			
		||||
	type fields struct {
 | 
			
		||||
		rdb redis.UniversalClient
 | 
			
		||||
	}
 | 
			
		||||
	type args struct {
 | 
			
		||||
		ctx            context.Context
 | 
			
		||||
		conversationID string
 | 
			
		||||
		msgs           []*sdkws.MsgData
 | 
			
		||||
	}
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name    string
 | 
			
		||||
		fields  fields
 | 
			
		||||
		args    args
 | 
			
		||||
		want    int
 | 
			
		||||
		wantErr assert.ErrorAssertionFunc
 | 
			
		||||
	}{
 | 
			
		||||
		{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379"})}, args{context.Background(),
 | 
			
		||||
			"cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			c := &msgCache{
 | 
			
		||||
				rdb: tt.fields.rdb,
 | 
			
		||||
			}
 | 
			
		||||
			got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs)
 | 
			
		||||
			if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testParallelSetMessageToCache(t, cid, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	ret, err := cacher.ParallelSetMessageToCache(context.Background(), cid, msgs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, len(msgs), ret)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for _, msg := range msgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val, err := rdb.Exists(context.Background(), key).Result()
 | 
			
		||||
		assert.Nil(t, err)
 | 
			
		||||
		assert.EqualValues(t, 1, val)
 | 
			
		||||
func Test_msgCache_GetMessagesBySeq(t *testing.T) {
 | 
			
		||||
	type fields struct {
 | 
			
		||||
		rdb redis.UniversalClient
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPipeSetMessageToCache(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid      = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst = rand.Int63()
 | 
			
		||||
		msgs     = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq: seqFirst + int64(i),
 | 
			
		||||
	type args struct {
 | 
			
		||||
		ctx            context.Context
 | 
			
		||||
		conversationID string
 | 
			
		||||
		seqs           []int64
 | 
			
		||||
	}
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name           string
 | 
			
		||||
		fields         fields
 | 
			
		||||
		args           args
 | 
			
		||||
		wantSeqMsgs    []*sdkws.MsgData
 | 
			
		||||
		wantFailedSeqs []int64
 | 
			
		||||
		wantErr        assert.ErrorAssertionFunc
 | 
			
		||||
	}{
 | 
			
		||||
		{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379"})},
 | 
			
		||||
			args{context.Background(), "cid", []int64{1, 2, 3}},
 | 
			
		||||
			[]*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, []int64{}, assert.NoError},
 | 
			
		||||
		{"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379"})},
 | 
			
		||||
			args{context.Background(), "cid", []int64{4, 5, 6}},
 | 
			
		||||
			[]*sdkws.MsgData{}, []int64{4, 5, 6}, assert.NoError},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			c := &msgCache{
 | 
			
		||||
				rdb: tt.fields.rdb,
 | 
			
		||||
			}
 | 
			
		||||
			gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs)
 | 
			
		||||
			if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			assert.Equalf(t, tt.wantSeqMsgs, gotSeqMsgs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)
 | 
			
		||||
			assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testPipeSetMessageToCache(t, cid, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeSetMessageToCache(t *testing.T, cid string, msgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	ret, err := cacher.PipeSetMessageToCache(context.Background(), cid, msgs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, len(msgs), ret)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for _, msg := range msgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val, err := rdb.Exists(context.Background(), key).Result()
 | 
			
		||||
		assert.Nil(t, err)
 | 
			
		||||
		assert.EqualValues(t, 1, val)
 | 
			
		||||
func Test_msgCache_DeleteMessagesFromCache(t *testing.T) {
 | 
			
		||||
	type fields struct {
 | 
			
		||||
		rdb redis.UniversalClient
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGetMessagesBySeq(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid      = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst = rand.Int63()
 | 
			
		||||
		msgs     = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	seqs := []int64{}
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq:    seqFirst + int64(i),
 | 
			
		||||
			SendID: fmt.Sprintf("fake-sendid-%v", i),
 | 
			
		||||
	type args struct {
 | 
			
		||||
		ctx            context.Context
 | 
			
		||||
		conversationID string
 | 
			
		||||
		seqs           []int64
 | 
			
		||||
	}
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name    string
 | 
			
		||||
		fields  fields
 | 
			
		||||
		args    args
 | 
			
		||||
		wantErr assert.ErrorAssertionFunc
 | 
			
		||||
	}{
 | 
			
		||||
		{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379"})},
 | 
			
		||||
			args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			c := &msgCache{
 | 
			
		||||
				rdb: tt.fields.rdb,
 | 
			
		||||
			}
 | 
			
		||||
			tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs),
 | 
			
		||||
				fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs))
 | 
			
		||||
		})
 | 
			
		||||
		seqs = append(seqs, seqFirst+int64(i))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// set data to cache
 | 
			
		||||
	testPipeSetMessageToCache(t, cid, msgs)
 | 
			
		||||
 | 
			
		||||
	// get data from cache with parallet mode
 | 
			
		||||
	testParallelGetMessagesBySeq(t, cid, seqs, msgs)
 | 
			
		||||
 | 
			
		||||
	// get data from cache with pipeline mode
 | 
			
		||||
	testPipeGetMessagesBySeq(t, cid, seqs, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, 0, len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, len(respMsgs), len(seqs))
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for idx, msg := range respMsgs {
 | 
			
		||||
		assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
 | 
			
		||||
		assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeGetMessagesBySeq(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, 0, len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, len(respMsgs), len(seqs))
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for idx, msg := range respMsgs {
 | 
			
		||||
		assert.Equal(t, msg.Seq, inputMsgs[idx].Seq)
 | 
			
		||||
		assert.Equal(t, msg.SendID, inputMsgs[idx].SendID)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGetMessagesBySeqWithEmptySeqs(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid            = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst int64 = 0
 | 
			
		||||
		msgs           = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	seqs := []int64{}
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq:    seqFirst + int64(i),
 | 
			
		||||
			SendID: fmt.Sprintf("fake-sendid-%v", i),
 | 
			
		||||
		})
 | 
			
		||||
		seqs = append(seqs, seqFirst+int64(i))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// don't set cache, only get data from cache.
 | 
			
		||||
 | 
			
		||||
	// get data from cache with parallet mode
 | 
			
		||||
	testParallelGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
 | 
			
		||||
 | 
			
		||||
	// get data from cache with pipeline mode
 | 
			
		||||
	testPipeGetMessagesBySeqWithEmptry(t, cid, seqs, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, len(seqs), len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, 0, len(respMsgs))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeGetMessagesBySeqWithEmptry(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Equal(t, err, redis.Nil)
 | 
			
		||||
	assert.Equal(t, len(seqs), len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, 0, len(respMsgs))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGetMessagesBySeqWithLostHalfSeqs(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid            = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst int64 = 0
 | 
			
		||||
		msgs           = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	seqs := []int64{}
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq:    seqFirst + int64(i),
 | 
			
		||||
			SendID: fmt.Sprintf("fake-sendid-%v", i),
 | 
			
		||||
		})
 | 
			
		||||
		seqs = append(seqs, seqFirst+int64(i))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Only set half the number of messages.
 | 
			
		||||
	testParallelSetMessageToCache(t, cid, msgs[:50])
 | 
			
		||||
 | 
			
		||||
	// get data from cache with parallet mode
 | 
			
		||||
	testParallelGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
 | 
			
		||||
 | 
			
		||||
	// get data from cache with pipeline mode
 | 
			
		||||
	testPipeGetMessagesBySeqWithLostHalfSeqs(t, cid, seqs, msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.ParallelGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, len(seqs)/2, len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, len(seqs)/2, len(respMsgs))
 | 
			
		||||
 | 
			
		||||
	for idx, msg := range respMsgs {
 | 
			
		||||
		assert.Equal(t, msg.Seq, seqs[idx])
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeGetMessagesBySeqWithLostHalfSeqs(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	respMsgs, failedSeqs, err := cacher.PipeGetMessagesBySeq(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
	assert.Equal(t, len(seqs)/2, len(failedSeqs))
 | 
			
		||||
	assert.Equal(t, len(seqs)/2, len(respMsgs))
 | 
			
		||||
 | 
			
		||||
	for idx, msg := range respMsgs {
 | 
			
		||||
		assert.Equal(t, msg.Seq, seqs[idx])
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestPipeDeleteMessages(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid      = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst = rand.Int63()
 | 
			
		||||
		msgs     = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	var seqs []int64
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq: seqFirst + int64(i),
 | 
			
		||||
		})
 | 
			
		||||
		seqs = append(seqs, msgs[i].Seq)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testPipeSetMessageToCache(t, cid, msgs)
 | 
			
		||||
	testPipeDeleteMessagesOK(t, cid, seqs, msgs)
 | 
			
		||||
 | 
			
		||||
	// set again
 | 
			
		||||
	testPipeSetMessageToCache(t, cid, msgs)
 | 
			
		||||
	testPipeDeleteMessagesMix(t, cid, seqs[:90], msgs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeDeleteMessagesOK(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for _, msg := range inputMsgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val := rdb.Exists(context.Background(), key).Val()
 | 
			
		||||
		assert.EqualValues(t, 0, val)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testPipeDeleteMessagesMix(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for idx, msg := range inputMsgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val, err := rdb.Exists(context.Background(), key).Result()
 | 
			
		||||
		assert.Nil(t, err)
 | 
			
		||||
		if idx < 90 {
 | 
			
		||||
			assert.EqualValues(t, 0, val) // not exists
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		assert.EqualValues(t, 1, val) // exists
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestParallelDeleteMessages(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		cid      = fmt.Sprintf("cid-%v", rand.Int63())
 | 
			
		||||
		seqFirst = rand.Int63()
 | 
			
		||||
		msgs     = []*sdkws.MsgData{}
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	var seqs []int64
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		msgs = append(msgs, &sdkws.MsgData{
 | 
			
		||||
			Seq: seqFirst + int64(i),
 | 
			
		||||
		})
 | 
			
		||||
		seqs = append(seqs, msgs[i].Seq)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	randSeqs := []int64{}
 | 
			
		||||
	for i := seqFirst + 100; i < seqFirst+200; i++ {
 | 
			
		||||
		randSeqs = append(randSeqs, i)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	testParallelSetMessageToCache(t, cid, msgs)
 | 
			
		||||
	testParallelDeleteMessagesOK(t, cid, seqs, msgs)
 | 
			
		||||
 | 
			
		||||
	// set again
 | 
			
		||||
	testParallelSetMessageToCache(t, cid, msgs)
 | 
			
		||||
	testParallelDeleteMessagesMix(t, cid, seqs[:90], msgs, 90)
 | 
			
		||||
	testParallelDeleteMessagesOK(t, cid, seqs[90:], msgs[:90])
 | 
			
		||||
 | 
			
		||||
	// set again
 | 
			
		||||
	testParallelSetMessageToCache(t, cid, msgs)
 | 
			
		||||
	testParallelDeleteMessagesMix(t, cid, randSeqs, msgs, 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelDeleteMessagesOK(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for _, msg := range inputMsgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val := rdb.Exists(context.Background(), key).Val()
 | 
			
		||||
		assert.EqualValues(t, 0, val)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testParallelDeleteMessagesMix(t *testing.T, cid string, seqs []int64, inputMsgs []*sdkws.MsgData, lessValNonExists int) {
 | 
			
		||||
	rdb := redis.NewClient(&redis.Options{})
 | 
			
		||||
	defer rdb.Close()
 | 
			
		||||
 | 
			
		||||
	cacher := msgCache{rdb: rdb}
 | 
			
		||||
 | 
			
		||||
	err := cacher.PipeDeleteMessages(context.Background(), cid, seqs)
 | 
			
		||||
	assert.Nil(t, err)
 | 
			
		||||
 | 
			
		||||
	// validate
 | 
			
		||||
	for idx, msg := range inputMsgs {
 | 
			
		||||
		key := cacher.getMessageCacheKey(cid, msg.Seq)
 | 
			
		||||
		val, err := rdb.Exists(context.Background(), key).Result()
 | 
			
		||||
		assert.Nil(t, err)
 | 
			
		||||
		if idx < lessValNonExists {
 | 
			
		||||
			assert.EqualValues(t, 0, val) // not exists
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		assert.EqualValues(t, 1, val) // exists
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										197
									
								
								pkg/common/storage/cache/redis/redis_shard_manager.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										197
									
								
								pkg/common/storage/cache/redis/redis_shard_manager.go
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@ -0,0 +1,197 @@
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"github.com/openimsdk/tools/errs"
 | 
			
		||||
	"github.com/openimsdk/tools/log"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	defaultBatchSize       = 50
 | 
			
		||||
	defaultConcurrentLimit = 3
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// RedisShardManager is a class for sharding and processing keys
 | 
			
		||||
type RedisShardManager struct {
 | 
			
		||||
	redisClient redis.UniversalClient
 | 
			
		||||
	config      *Config
 | 
			
		||||
}
 | 
			
		||||
type Config struct {
 | 
			
		||||
	batchSize       int
 | 
			
		||||
	continueOnError bool
 | 
			
		||||
	concurrentLimit int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Option is a function type for configuring Config
 | 
			
		||||
type Option func(c *Config)
 | 
			
		||||
 | 
			
		||||
// NewRedisShardManager creates a new RedisShardManager instance
 | 
			
		||||
func NewRedisShardManager(redisClient redis.UniversalClient, opts ...Option) *RedisShardManager {
 | 
			
		||||
	config := &Config{
 | 
			
		||||
		batchSize:       defaultBatchSize, // Default batch size is 50 keys
 | 
			
		||||
		continueOnError: false,
 | 
			
		||||
		concurrentLimit: defaultConcurrentLimit, // Default concurrent limit is 3
 | 
			
		||||
	}
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		opt(config)
 | 
			
		||||
	}
 | 
			
		||||
	rsm := &RedisShardManager{
 | 
			
		||||
		redisClient: redisClient,
 | 
			
		||||
		config:      config,
 | 
			
		||||
	}
 | 
			
		||||
	return rsm
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithBatchSize sets the number of keys to process per batch
 | 
			
		||||
func WithBatchSize(size int) Option {
 | 
			
		||||
	return func(c *Config) {
 | 
			
		||||
		c.batchSize = size
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithContinueOnError sets whether to continue processing on error
 | 
			
		||||
func WithContinueOnError(continueOnError bool) Option {
 | 
			
		||||
	return func(c *Config) {
 | 
			
		||||
		c.continueOnError = continueOnError
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithConcurrentLimit sets the concurrency limit
 | 
			
		||||
func WithConcurrentLimit(limit int) Option {
 | 
			
		||||
	return func(c *Config) {
 | 
			
		||||
		c.concurrentLimit = limit
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
 | 
			
		||||
func (rsm *RedisShardManager) ProcessKeysBySlot(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
	keys []string,
 | 
			
		||||
	processFunc func(ctx context.Context, slot int64, keys []string) error,
 | 
			
		||||
) error {
 | 
			
		||||
 | 
			
		||||
	// Group keys by slot
 | 
			
		||||
	slots, err := groupKeysBySlot(ctx, rsm.redisClient, keys)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	g, ctx := errgroup.WithContext(ctx)
 | 
			
		||||
	g.SetLimit(rsm.config.concurrentLimit)
 | 
			
		||||
 | 
			
		||||
	// Process keys in each slot using the provided function
 | 
			
		||||
	for slot, singleSlotKeys := range slots {
 | 
			
		||||
		batches := splitIntoBatches(singleSlotKeys, rsm.config.batchSize)
 | 
			
		||||
		for _, batch := range batches {
 | 
			
		||||
			slot, batch := slot, batch // Avoid closure capture issue
 | 
			
		||||
			g.Go(func() error {
 | 
			
		||||
				err := processFunc(ctx, slot, batch)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
 | 
			
		||||
					if !rsm.config.continueOnError {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := g.Wait(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// groupKeysBySlot groups keys by their Redis cluster hash slots.
 | 
			
		||||
func groupKeysBySlot(ctx context.Context, redisClient redis.UniversalClient, keys []string) (map[int64][]string, error) {
 | 
			
		||||
	slots := make(map[int64][]string)
 | 
			
		||||
	clusterClient, isCluster := redisClient.(*redis.ClusterClient)
 | 
			
		||||
	if isCluster {
 | 
			
		||||
		pipe := clusterClient.Pipeline()
 | 
			
		||||
		cmds := make([]*redis.IntCmd, len(keys))
 | 
			
		||||
		for i, key := range keys {
 | 
			
		||||
			cmds[i] = pipe.ClusterKeySlot(ctx, key)
 | 
			
		||||
		}
 | 
			
		||||
		_, err := pipe.Exec(ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, errs.WrapMsg(err, "get slot err")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for i, cmd := range cmds {
 | 
			
		||||
			slot, err := cmd.Result()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.ZWarn(ctx, "some key get slot err", err, "key", keys[i])
 | 
			
		||||
				return nil, errs.WrapMsg(err, "get slot err", "key", keys[i])
 | 
			
		||||
			}
 | 
			
		||||
			slots[slot] = append(slots[slot], keys[i])
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		// If not a cluster client, put all keys in the same slot (0)
 | 
			
		||||
		slots[0] = keys
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return slots, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// splitIntoBatches splits keys into batches of the specified size
 | 
			
		||||
func splitIntoBatches(keys []string, batchSize int) [][]string {
 | 
			
		||||
	var batches [][]string
 | 
			
		||||
	for batchSize < len(keys) {
 | 
			
		||||
		keys, batches = keys[batchSize:], append(batches, keys[0:batchSize:batchSize])
 | 
			
		||||
	}
 | 
			
		||||
	return append(batches, keys)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProcessKeysBySlot groups keys by their Redis cluster hash slots and processes them using the provided function.
 | 
			
		||||
func ProcessKeysBySlot(
 | 
			
		||||
	ctx context.Context,
 | 
			
		||||
	redisClient redis.UniversalClient,
 | 
			
		||||
	keys []string,
 | 
			
		||||
	processFunc func(ctx context.Context, slot int64, keys []string) error,
 | 
			
		||||
	opts ...Option,
 | 
			
		||||
) error {
 | 
			
		||||
 | 
			
		||||
	config := &Config{
 | 
			
		||||
		batchSize:       defaultBatchSize,
 | 
			
		||||
		continueOnError: false,
 | 
			
		||||
		concurrentLimit: defaultConcurrentLimit,
 | 
			
		||||
	}
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		opt(config)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Group keys by slot
 | 
			
		||||
	slots, err := groupKeysBySlot(ctx, redisClient, keys)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	g, ctx := errgroup.WithContext(ctx)
 | 
			
		||||
	g.SetLimit(config.concurrentLimit)
 | 
			
		||||
 | 
			
		||||
	// Process keys in each slot using the provided function
 | 
			
		||||
	for slot, singleSlotKeys := range slots {
 | 
			
		||||
		batches := splitIntoBatches(singleSlotKeys, config.batchSize)
 | 
			
		||||
		for _, batch := range batches {
 | 
			
		||||
			slot, batch := slot, batch // Avoid closure capture issue
 | 
			
		||||
			g.Go(func() error {
 | 
			
		||||
				err := processFunc(ctx, slot, batch)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					log.ZWarn(ctx, "Batch processFunc failed", err, "slot", slot, "keys", batch)
 | 
			
		||||
					if !config.continueOnError {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := g.Wait(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										6
									
								
								pkg/common/storage/cache/user.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								pkg/common/storage/cache/user.go
									
									
									
									
										vendored
									
									
								
							@ -16,15 +16,15 @@ package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/protocol/user"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type UserCache interface {
 | 
			
		||||
	BatchDeleter
 | 
			
		||||
	CloneUserCache() UserCache
 | 
			
		||||
	GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.User, err error)
 | 
			
		||||
	GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.User, error)
 | 
			
		||||
	GetUserInfo(ctx context.Context, userID string) (userInfo *model.User, err error)
 | 
			
		||||
	GetUsersInfo(ctx context.Context, userIDs []string) ([]*model.User, error)
 | 
			
		||||
	DelUsersInfo(userIDs ...string) UserCache
 | 
			
		||||
	GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
 | 
			
		||||
	DelUsersGlobalRecvMsgOpt(userIDs ...string) UserCache
 | 
			
		||||
 | 
			
		||||
@ -54,8 +54,6 @@ type CommonMsgDatabase interface {
 | 
			
		||||
	MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
 | 
			
		||||
	// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
 | 
			
		||||
	DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
 | 
			
		||||
	// DelUserDeleteMsgsList deletes user's message deletion list.
 | 
			
		||||
	DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64)
 | 
			
		||||
	// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
 | 
			
		||||
	BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
 | 
			
		||||
	// GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers.
 | 
			
		||||
@ -347,11 +345,7 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
 | 
			
		||||
	return db.msg.DeleteMessages(ctx, conversationID, seqs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversationID string, seqs []int64) {
 | 
			
		||||
	db.msg.DelUserDeleteMsgsList(ctx, conversationID, seqs)
 | 
			
		||||
	return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
 | 
			
		||||
@ -378,7 +372,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
 | 
			
		||||
		userSeqMap[m.SendID] = m.Seq
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	failedNum, err := db.msg.SetMessageToCache(ctx, conversationID, msgs)
 | 
			
		||||
	failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
 | 
			
		||||
		log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
 | 
			
		||||
@ -583,59 +577,17 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	var successMsgs []*sdkws.MsgData
 | 
			
		||||
	if len(cachedMsgs) > 0 {
 | 
			
		||||
		delSeqs, err := db.msg.GetUserDelList(ctx, userID, conversationID)
 | 
			
		||||
		if err != nil && errs.Unwrap(err) != redis.Nil {
 | 
			
		||||
			return 0, 0, nil, err
 | 
			
		||||
		}
 | 
			
		||||
		var cacheDelNum int
 | 
			
		||||
		for _, msg := range cachedMsgs {
 | 
			
		||||
			if !datautil.Contain(msg.Seq, delSeqs...) {
 | 
			
		||||
				successMsgs = append(successMsgs, msg)
 | 
			
		||||
			} else {
 | 
			
		||||
				cacheDelNum += 1
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum)
 | 
			
		||||
		var reGetSeqsCache []int64
 | 
			
		||||
		for i := 1; i <= cacheDelNum; {
 | 
			
		||||
			newSeq := newBegin - int64(i)
 | 
			
		||||
			if newSeq >= begin {
 | 
			
		||||
				if !datautil.Contain(newSeq, delSeqs...) {
 | 
			
		||||
					log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq)
 | 
			
		||||
					reGetSeqsCache = append(reGetSeqsCache, newSeq)
 | 
			
		||||
					i++
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if len(reGetSeqsCache) > 0 {
 | 
			
		||||
			log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache)
 | 
			
		||||
			cachedMsgs, failedSeqs2, err := db.msg.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				if err != redis.Nil {
 | 
			
		||||
 | 
			
		||||
					log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			failedSeqs = append(failedSeqs, failedSeqs2...)
 | 
			
		||||
			successMsgs = append(successMsgs, cachedMsgs...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
 | 
			
		||||
	if len(failedSeqs) != 0 {
 | 
			
		||||
		log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs)
 | 
			
		||||
	}
 | 
			
		||||
	log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs)
 | 
			
		||||
	// get from cache or db
 | 
			
		||||
 | 
			
		||||
	if len(failedSeqs) > 0 {
 | 
			
		||||
		log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs)
 | 
			
		||||
		mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 | 
			
		||||
			return 0, 0, nil, err
 | 
			
		||||
		}
 | 
			
		||||
		successMsgs = append(mongoMsgs, successMsgs...)
 | 
			
		||||
		successMsgs = append(mongoMsgs, cachedMsgs...)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return minSeq, maxSeq, successMsgs, nil
 | 
			
		||||
@ -695,12 +647,6 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
 | 
			
		||||
	if minSeq == 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if remainTime == 0 {
 | 
			
		||||
		err = db.msg.CleanUpOneConversationAllMsg(ctx, conversationID)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return db.seq.SetMinSeq(ctx, conversationID, minSeq)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -820,7 +766,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
 | 
			
		||||
	if err := db.msg.DeleteMessages(ctx, conversationID, allSeqs); err != nil {
 | 
			
		||||
	if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
 | 
			
		||||
@ -836,21 +782,9 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
 | 
			
		||||
	cachedMsgs, _, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs)
 | 
			
		||||
	if err != nil && errs.Unwrap(err) != redis.Nil {
 | 
			
		||||
		log.ZWarn(ctx, "DeleteUserMsgsBySeqs", err, "conversationID", conversationID, "seqs", seqs)
 | 
			
		||||
	if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if len(cachedMsgs) > 0 {
 | 
			
		||||
		var cacheSeqs []int64
 | 
			
		||||
		for _, msg := range cachedMsgs {
 | 
			
		||||
			cacheSeqs = append(cacheSeqs, msg.Seq)
 | 
			
		||||
		}
 | 
			
		||||
		if err := db.msg.UserDeleteMsgs(ctx, conversationID, cacheSeqs, userID); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
 | 
			
		||||
		for _, seq := range seqs {
 | 
			
		||||
			if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
 | 
			
		||||
 | 
			
		||||
@ -5,6 +5,7 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
 | 
			
		||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
 | 
			
		||||
	"github.com/openimsdk/tools/utils/datautil"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/openimsdk/protocol/constant"
 | 
			
		||||
@ -108,29 +109,11 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
 | 
			
		||||
					{Key: "input", Value: indexs},
 | 
			
		||||
					{Key: "as", Value: "index"},
 | 
			
		||||
					{Key: "in", Value: bson.D{
 | 
			
		||||
						{Key: "$let", Value: bson.D{
 | 
			
		||||
							{Key: "vars", Value: bson.D{
 | 
			
		||||
								{Key: "currentMsg", Value: bson.D{
 | 
			
		||||
									{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
 | 
			
		||||
								}},
 | 
			
		||||
							}},
 | 
			
		||||
							{Key: "in", Value: bson.D{
 | 
			
		||||
								{Key: "$cond", Value: bson.D{
 | 
			
		||||
									{Key: "if", Value: bson.D{
 | 
			
		||||
										{Key: "$in", Value: bson.A{userID, "$$currentMsg.del_list"}},
 | 
			
		||||
									}},
 | 
			
		||||
									{Key: "then", Value: nil},
 | 
			
		||||
									{Key: "else", Value: "$$currentMsg"},
 | 
			
		||||
								}},
 | 
			
		||||
							}},
 | 
			
		||||
						}},
 | 
			
		||||
						{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
 | 
			
		||||
					}},
 | 
			
		||||
				}},
 | 
			
		||||
			}},
 | 
			
		||||
		}}},
 | 
			
		||||
		bson.D{{Key: "$project", Value: bson.D{
 | 
			
		||||
			{Key: "msgs.del_list", Value: 0},
 | 
			
		||||
		}}},
 | 
			
		||||
	}
 | 
			
		||||
	msgDocModel, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@ -145,6 +128,10 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
 | 
			
		||||
		if msg == nil || msg.Msg == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if datautil.Contain(userID, msg.DelList...) {
 | 
			
		||||
			msg.Msg.Content = ""
 | 
			
		||||
			msg.Msg.Status = constant.MsgDeleted
 | 
			
		||||
		}
 | 
			
		||||
		if msg.Revoke != nil {
 | 
			
		||||
			revokeContent := sdkws.MessageRevokedContent{
 | 
			
		||||
				RevokerID:                   msg.Revoke.UserID,
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user