From aaf898567f85cd57ba536dfd33ab9bd5f8e8c634 Mon Sep 17 00:00:00 2001
From: withchao <993506633@qq.com>
Date: Fri, 7 Mar 2025 15:51:47 +0800
Subject: [PATCH] solve uncorrect notification when set group info

---
 go.mod                                        |  26 ++-
 go.sum                                        |  71 ++++++-
 internal/api/msg.go                           |   2 +-
 internal/msggateway/ws_server.go              |  15 +-
 .../msgtransfer/online_history_msg_handler.go |   2 +-
 .../online_msg_to_mongo_handler.go            |   2 +-
 internal/push/offlinepush_handler.go          |   2 +-
 internal/push/onlinepusher.go                 |   5 +-
 internal/push/push_handler.go                 |   2 +-
 internal/rpc/auth/auth.go                     |   2 +-
 internal/rpc/conversation/notification.go     |   7 +-
 internal/rpc/group/group.go                   |   5 +-
 internal/rpc/group/notification.go            |  10 +-
 internal/rpc/msg/notification.go              |  10 +-
 internal/rpc/msg/server.go                    |   9 +-
 internal/rpc/relation/notification.go         |   5 +-
 internal/rpc/third/s3.go                      |   8 +-
 internal/rpc/user/notification.go             |   5 +-
 pkg/common/config/config.go                   |   2 +-
 .../discoveryregister/discoveryregister.go    |   9 +-
 .../discoveryregister/kubernetes/doc.go       |  15 --
 .../kubernetes/kubernetes.go                  | 199 ------------------
 pkg/common/startrpc/start.go                  |   1 +
 pkg/common/storage/controller/msg.go          |   2 +-
 pkg/common/storage/controller/msg_transfer.go |   3 +-
 pkg/common/storage/controller/push.go         |   2 +-
 pkg/common/storage/controller/s3.go           |   4 +-
 pkg/common/storage/kafka/config.go            |  33 +++
 pkg/common/storage/kafka/consumer_group.go    |  68 ++++++
 pkg/common/storage/kafka/producer.go          |  82 ++++++++
 pkg/common/storage/kafka/sarama.go            |  85 ++++++++
 pkg/common/storage/kafka/tls.go               |  83 ++++++++
 pkg/common/storage/kafka/util.go              |  34 +++
 pkg/common/storage/kafka/verify.go            |  79 +++++++
 pkg/notification/msg.go                       |   5 +-
 tools/check-component/main.go                 |   2 +-
 tools/seq/internal/main.go                    |   2 +-
 37 files changed, 623 insertions(+), 275 deletions(-)
 delete mode 100644 pkg/common/discoveryregister/kubernetes/doc.go
 delete mode 100644 pkg/common/discoveryregister/kubernetes/kubernetes.go
 create mode 100644 pkg/common/storage/kafka/config.go
 create mode 100644 pkg/common/storage/kafka/consumer_group.go
 create mode 100644 pkg/common/storage/kafka/producer.go
 create mode 100644 pkg/common/storage/kafka/sarama.go
 create mode 100644 pkg/common/storage/kafka/tls.go
 create mode 100644 pkg/common/storage/kafka/util.go
 create mode 100644 pkg/common/storage/kafka/verify.go

diff --git a/go.mod b/go.mod
index ccd9885d0..5a2b0c263 100644
--- a/go.mod
+++ b/go.mod
@@ -40,7 +40,6 @@ require (
 	github.com/robfig/cron/v3 v3.0.1
 	github.com/shirou/gopsutil v3.21.11+incompatible
 	github.com/spf13/viper v1.18.2
-	github.com/stathat/consistent v1.0.0
 	go.etcd.io/etcd/client/v3 v3.5.13
 	go.uber.org/automaxprocs v1.5.3
 	golang.org/x/sync v0.8.0
@@ -88,19 +87,27 @@ require (
 	github.com/eapache/go-resiliency v1.6.0 // indirect
 	github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
 	github.com/eapache/queue v1.1.0 // indirect
+	github.com/emicklei/go-restful/v3 v3.11.0 // indirect
 	github.com/felixge/httpsnoop v1.0.4 // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
+	github.com/fxamacker/cbor/v2 v2.7.0 // indirect
 	github.com/gabriel-vasile/mimetype v1.4.3 // indirect
 	github.com/gin-contrib/sse v0.1.0 // indirect
 	github.com/go-logr/logr v1.4.2 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
 	github.com/go-ole/go-ole v1.2.6 // indirect
+	github.com/go-openapi/jsonpointer v0.19.6 // indirect
+	github.com/go-openapi/jsonreference v0.20.2 // indirect
+	github.com/go-openapi/swag v0.22.4 // indirect
 	github.com/go-playground/universal-translator v0.18.1 // indirect
 	github.com/go-zookeeper/zk v1.0.3 // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.4 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
+	github.com/google/gnostic-models v0.6.8 // indirect
+	github.com/google/go-cmp v0.6.0 // indirect
 	github.com/google/go-querystring v1.1.0 // indirect
+	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/s2a-go v0.1.7 // indirect
 	github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
 	github.com/googleapis/gax-go/v2 v2.12.3 // indirect
@@ -117,6 +124,7 @@ require (
 	github.com/jinzhu/copier v0.4.0 // indirect
 	github.com/jinzhu/inflection v1.0.0 // indirect
 	github.com/jinzhu/now v1.1.5 // indirect
+	github.com/josharian/intern v1.0.0 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/kelindar/simd v1.1.2 // indirect
 	github.com/klauspost/compress v1.17.7 // indirect
@@ -126,6 +134,7 @@ require (
 	github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
 	github.com/magefile/mage v1.15.0 // indirect
 	github.com/magiconair/properties v1.8.7 // indirect
+	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
 	github.com/minio/md5-simd v1.1.2 // indirect
@@ -135,6 +144,7 @@ require (
 	github.com/modern-go/reflect2 v1.0.2 // indirect
 	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
 	github.com/mozillazg/go-httpheader v0.4.0 // indirect
+	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
 	github.com/pierrec/lz4/v4 v4.1.21 // indirect
 	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@@ -156,6 +166,7 @@ require (
 	github.com/tklauser/go-sysconf v0.3.13 // indirect
 	github.com/tklauser/numcpus v0.7.0 // indirect
 	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+	github.com/x448/float16 v0.8.4 // indirect
 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
 	github.com/xdg-go/scram v1.1.2 // indirect
 	github.com/xdg-go/stringprep v1.0.4 // indirect
@@ -177,14 +188,25 @@ require (
 	golang.org/x/net v0.29.0 // indirect
 	golang.org/x/oauth2 v0.23.0 // indirect
 	golang.org/x/sys v0.25.0 // indirect
+	golang.org/x/term v0.24.0 // indirect
 	golang.org/x/text v0.18.0 // indirect
 	golang.org/x/time v0.5.0 // indirect
 	google.golang.org/appengine/v2 v2.0.2 // indirect
 	google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
 	google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
+	gopkg.in/inf.v0 v0.9.1 // indirect
+	gopkg.in/yaml.v2 v2.4.0 // indirect
 	gorm.io/gorm v1.25.8 // indirect
-	stathat.com/c/consistent v1.0.0 // indirect
+	k8s.io/api v0.31.2 // indirect
+	k8s.io/apimachinery v0.31.2 // indirect
+	k8s.io/client-go v0.31.2 // indirect
+	k8s.io/klog/v2 v2.130.1 // indirect
+	k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
+	k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
+	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
+	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
+	sigs.k8s.io/yaml v1.4.0 // indirect
 )
 
 require (
diff --git a/go.sum b/go.sum
index 18d2a6495..af3515763 100644
--- a/go.sum
+++ b/go.sum
@@ -103,6 +103,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
 github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
+github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -117,6 +119,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk
 github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
 github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
 github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
+github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
+github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
 github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
 github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
 github.com/gin-contrib/gzip v1.0.1 h1:HQ8ENHODeLY7a4g1Au/46Z92bdGFl74OhxcZble9WJE=
@@ -132,6 +136,13 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
 github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
 github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
+github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
+github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
+github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
+github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
+github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
+github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogBU=
+github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
 github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
 github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -150,6 +161,8 @@ github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGK
 github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
 github.com/go-redis/redismock/v9 v9.2.0 h1:ZrMYQeKPECZPjOj5u9eyOjg8Nnb0BS9lkVIZ6IpsKLw=
 github.com/go-redis/redismock/v9 v9.2.0/go.mod h1:18KHfGDK4Y6c2R0H38EUGWAdc7ZQS9gfYxc94k7rWT0=
+github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
+github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
 github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
 github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
@@ -179,6 +192,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
 github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
+github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -186,14 +201,19 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
 github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
 github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
+github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
 github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
+github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM=
+github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
 github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
 github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -244,6 +264,8 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
 github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
+github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
+github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/kelindar/bitmap v1.5.2 h1:XwX7CTvJtetQZ64zrOkApoZZHBJRkjE23NfqUALA/HE=
@@ -285,6 +307,8 @@ github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
 github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
 github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
 github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
+github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
+github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
 github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
 github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
 github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@@ -311,14 +335,18 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
 github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
 github.com/mozillazg/go-httpheader v0.4.0 h1:aBn6aRXtFzyDLZ4VIRLsZbbJloagQfMnCiYgOq6hK4w=
 github.com/mozillazg/go-httpheader v0.4.0/go.mod h1:PuT8h0pw6efvp8ZeUec1Rs7dwjK08bt6gKSReGMqtdA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
 github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
 github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
 github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
 github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
+github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
+github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
 github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
 github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
-github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
-github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
+github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
+github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 github.com/openimsdk/protocol v0.0.72-alpha.78 h1:n9HVj5olMPiGLF3Z4apPvvYzn2yOpyrsn2/YiAaIsxw=
 github.com/openimsdk/protocol v0.0.72-alpha.78/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
 github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc=
@@ -356,8 +384,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
 github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
-github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
-github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
+github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
 github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -379,8 +407,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
 github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
 github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
 github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
-github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
-github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@@ -410,6 +436,8 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
 github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
 github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
 github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
+github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
+github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
 github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
@@ -449,8 +477,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
 go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
 go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
-go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
-go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
@@ -527,6 +555,8 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
+golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -548,6 +578,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -592,11 +624,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
+gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
@@ -607,7 +642,23 @@ gorm.io/gorm v1.25.8 h1:WAGEZ/aEcznN4D03laj8DKnehe1e9gYQAjW8xyPRdeo=
 gorm.io/gorm v1.25.8/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+k8s.io/api v0.31.2 h1:3wLBbL5Uom/8Zy98GRPXpJ254nEFpl+hwndmk9RwmL0=
+k8s.io/api v0.31.2/go.mod h1:bWmGvrGPssSK1ljmLzd3pwCQ9MgoTsRCuK35u6SygUk=
+k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw=
+k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
+k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc=
+k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs=
+k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
+k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
+k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
+k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
+k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
+k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
 nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
 rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
-stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
-stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
+sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
+sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
+sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
+sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
+sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
+sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
diff --git a/internal/api/msg.go b/internal/api/msg.go
index fc235354c..4fe950ffa 100644
--- a/internal/api/msg.go
+++ b/internal/api/msg.go
@@ -281,7 +281,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
 				IsSendMsg:        false,
 				ReliabilityLevel: 1,
 				UnreadCount:      false,
-			}),
+			}, nil),
 		},
 	}
 	respPb, err := m.Client.SendMsg(c, &sendMsgReq)
diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go
index 208289e9e..a18ad0058 100644
--- a/internal/msggateway/ws_server.go
+++ b/internal/msggateway/ws_server.go
@@ -3,12 +3,13 @@ package msggateway
 import (
 	"context"
 	"fmt"
-	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 	"net/http"
 	"sync"
 	"sync/atomic"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
+
 	"github.com/go-playground/validator/v10"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@@ -212,15 +213,19 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
 	if err != nil {
 		return err
 	}
+	if len(conns) == 0 || (len(conns) == 1 && ws.disCov.IsSelfNode(conns[0])) {
+		return nil
+	}
+
 	wg := errgroup.Group{}
 	wg.SetLimit(concurrentRequest)
 
 	// Online push user online message to other node
 	for _, v := range conns {
 		v := v
-		log.ZDebug(ctx, " sendUserOnlineInfoToOtherNode conn ", "target", v.Target())
-		if v.Target() == ws.disCov.GetSelfConnTarget() {
-			log.ZDebug(ctx, "Filter out this node", "node", v.Target())
+		log.ZDebug(ctx, "sendUserOnlineInfoToOtherNode conn")
+		if ws.disCov.IsSelfNode(v) {
+			log.ZDebug(ctx, "Filter out this node")
 			continue
 		}
 
@@ -231,7 +236,7 @@ func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *C
 				PlatformID: int32(client.PlatformID), Token: client.token,
 			})
 			if err != nil {
-				log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
+				log.ZWarn(ctx, "MultiTerminalLoginCheck err", err)
 			}
 			return nil
 		})
diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go
index fce0297ee..b85848d9e 100644
--- a/internal/msgtransfer/online_history_msg_handler.go
+++ b/internal/msgtransfer/online_history_msg_handler.go
@@ -30,6 +30,7 @@ import (
 	"github.com/IBM/sarama"
 	"github.com/go-redis/redis"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
 	"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
 	"github.com/openimsdk/protocol/constant"
@@ -38,7 +39,6 @@ import (
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
 	"github.com/openimsdk/tools/mcontext"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/utils/stringutil"
 	"google.golang.org/protobuf/proto"
 )
diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go
index d8836d54e..7892d0276 100644
--- a/internal/msgtransfer/online_msg_to_mongo_handler.go
+++ b/internal/msgtransfer/online_msg_to_mongo_handler.go
@@ -21,9 +21,9 @@ import (
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	pbmsg "github.com/openimsdk/protocol/msg"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 	"google.golang.org/protobuf/proto"
 )
 
diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go
index 5c69da005..c685a188a 100644
--- a/internal/push/offlinepush_handler.go
+++ b/internal/push/offlinepush_handler.go
@@ -7,12 +7,12 @@ import (
 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/protocol/constant"
 	pbpush "github.com/openimsdk/protocol/push"
 	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/utils/jsonutil"
 	"google.golang.org/protobuf/proto"
 )
diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go
index 9521a84a0..2393c3567 100644
--- a/internal/push/onlinepusher.go
+++ b/internal/push/onlinepusher.go
@@ -2,6 +2,8 @@ package push
 
 import (
 	"context"
+	"sync"
+
 	"github.com/openimsdk/protocol/msggateway"
 	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/discovery"
@@ -9,7 +11,6 @@ import (
 	"github.com/openimsdk/tools/utils/datautil"
 	"golang.org/x/sync/errgroup"
 	"google.golang.org/grpc"
-	"sync"
 )
 
 type OnlinePusher interface {
@@ -160,7 +161,7 @@ func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg
 		}
 	}
 	log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
-	var usersConns = make(map[*grpc.ClientConn][]string)
+	var usersConns = make(map[grpc.ClientConnInterface][]string)
 	for host, userIds := range usersHost {
 		tconn, _ := k.disCov.GetConn(ctx, host)
 		usersConns[tconn] = userIds
diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go
index c82c8f898..e88e4b5a9 100644
--- a/internal/push/push_handler.go
+++ b/internal/push/push_handler.go
@@ -14,6 +14,7 @@ import (
 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
 	"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
 	"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
@@ -25,7 +26,6 @@ import (
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/log"
 	"github.com/openimsdk/tools/mcontext"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/utils/datautil"
 	"github.com/openimsdk/tools/utils/jsonutil"
 	"github.com/openimsdk/tools/utils/timeutil"
diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go
index 2bf08783d..a7055f6a5 100644
--- a/internal/rpc/auth/auth.go
+++ b/internal/rpc/auth/auth.go
@@ -192,7 +192,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
 		return err
 	}
 	for _, v := range conns {
-		log.ZDebug(ctx, "forceKickOff", "conn", v.Target())
+		log.ZDebug(ctx, "forceKickOff", "userID", userID, "platformID", platformID)
 		client := msggateway.NewMsgGatewayClient(v)
 		kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
 		_, err := client.KickUserOffline(ctx, kickReq)
diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go
index c6368b916..370865c1a 100644
--- a/internal/rpc/conversation/notification.go
+++ b/internal/rpc/conversation/notification.go
@@ -16,21 +16,22 @@ package conversation
 
 import (
 	"context"
+
+	"github.com/openimsdk/open-im-server/v3/pkg/notification"
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 	"github.com/openimsdk/protocol/msg"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
-	"github.com/openimsdk/open-im-server/v3/pkg/notification"
 	"github.com/openimsdk/protocol/constant"
 	"github.com/openimsdk/protocol/sdkws"
 )
 
 type ConversationNotificationSender struct {
-	*rpcclient.NotificationSender
+	*notification.NotificationSender
 }
 
 func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender {
-	return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
+	return &ConversationNotificationSender{notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
 		return msgClient.SendMsg(ctx, req)
 	}))}
 }
diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go
index 0b03be6f5..2cd6c4f27 100644
--- a/internal/rpc/group/group.go
+++ b/internal/rpc/group/group.go
@@ -17,13 +17,14 @@ package group
 import (
 	"context"
 	"fmt"
-	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 	"math/big"
 	"math/rand"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
+
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -1129,7 +1130,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
 
 	if notificationFlag {
 		if req.Notification.Value != "" {
-			conversation := &pbconv.ConversationReq{
+			conversation := &pbconversation.ConversationReq{
 				ConversationID:   msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID),
 				ConversationType: constant.ReadGroupChatType,
 				GroupID:          req.GroupID,
diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go
index fb44bc312..ce42d0006 100644
--- a/internal/rpc/group/notification.go
+++ b/internal/rpc/group/notification.go
@@ -52,11 +52,11 @@ const (
 
 func NewNotificationSender(db controller.GroupDatabase, config *Config, userClient *rpcli.UserClient, msgClient *rpcli.MsgClient, conversationClient *rpcli.ConversationClient) *NotificationSender {
 	return &NotificationSender{
-		NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig,
-			rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
+		NotificationSender: notification.NewNotificationSender(&config.NotificationConfig,
+			notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
 				return msgClient.SendMsg(ctx, req)
 			}),
-			rpcclient.WithUserRpcClient(userClient.GetUserInfo),
+			notification.WithUserRpcClient(userClient.GetUserInfo),
 		),
 		getUsersInfo: func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) {
 			users, err := userClient.GetUsersInfo(ctx, userIDs)
@@ -73,7 +73,7 @@ func NewNotificationSender(db controller.GroupDatabase, config *Config, userClie
 }
 
 type NotificationSender struct {
-	*rpcclient.NotificationSender
+	*notification.NotificationSender
 	getUsersInfo       func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
 	db                 controller.GroupDatabase
 	config             *Config
@@ -332,7 +332,7 @@ func (g *NotificationSender) GroupInfoSetNotification(ctx context.Context, tips
 		return
 	}
 	g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
-	g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName())
+	g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, notification.WithRpcGetUserName())
 }
 
 func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) {
diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go
index d5604286a..0daafbe6c 100644
--- a/internal/rpc/msg/notification.go
+++ b/internal/rpc/msg/notification.go
@@ -23,11 +23,11 @@ import (
 )
 
 type MsgNotificationSender struct {
-	*rpcclient.NotificationSender
+	*notification.NotificationSender
 }
 
-func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender {
-	return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)}
+func NewMsgNotificationSender(config *Config, opts ...notification.NotificationSenderOptions) *MsgNotificationSender {
+	return &MsgNotificationSender{notification.NewNotificationSender(&config.NotificationConfig, opts...)}
 }
 
 func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) {
@@ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv
 	}
 	m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
 }
+
+func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) {
+	m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips)
+}
diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go
index 74a6355d7..df5a72075 100644
--- a/internal/rpc/msg/server.go
+++ b/internal/rpc/msg/server.go
@@ -16,6 +16,8 @@ package msg
 
 import (
 	"context"
+
+	"github.com/openimsdk/open-im-server/v3/pkg/notification"
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -27,7 +29,6 @@ import (
 	"github.com/openimsdk/tools/db/redisutil"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
-	"github.com/openimsdk/open-im-server/v3/pkg/notification"
 	"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
 	"github.com/openimsdk/protocol/constant"
 	"github.com/openimsdk/protocol/conversation"
@@ -63,7 +64,7 @@ type msgServer struct {
 	GroupLocalCache        *rpccache.GroupLocalCache        // Local cache for group data.
 	ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
 	Handlers               MessageInterceptorChain          // Chain of handlers for processing messages.
-	notificationSender     *rpcclient.NotificationSender    // RPC client for sending notifications.
+	notificationSender     *notification.NotificationSender // RPC client for sending notifications.
 	msgNotificationSender  *MsgNotificationSender           // RPC client for sending msg notifications.
 	config                 *Config                          // Global configuration settings.
 	webhookClient          *webhook.Client
@@ -132,8 +133,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
 		conversationClient:     conversationClient,
 	}
 
-	s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
-	s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
+	s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg))
+	s.msgNotificationSender = NewMsgNotificationSender(config, notification.WithLocalSendMsg(s.SendMsg))
 
 	msg.RegisterMsgServer(server, s)
 
diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go
index a34a4d322..caf2dafe1 100644
--- a/internal/rpc/relation/notification.go
+++ b/internal/rpc/relation/notification.go
@@ -16,6 +16,7 @@ package relation
 
 import (
 	"context"
+
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 	"github.com/openimsdk/protocol/msg"
 
@@ -36,7 +37,7 @@ import (
 )
 
 type FriendNotificationSender struct {
-	*rpcclient.NotificationSender
+	*notification.NotificationSender
 	// Target not found err
 	getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
 	// db controller
@@ -89,7 +90,7 @@ func WithRpcFunc(
 
 func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender {
 	f := &FriendNotificationSender{
-		NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
+		NotificationSender: notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
 			return msgClient.SendMsg(ctx, req)
 		})),
 	}
diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go
index 8796fe824..97206dd6d 100644
--- a/internal/rpc/third/s3.go
+++ b/internal/rpc/third/s3.go
@@ -19,11 +19,12 @@ import (
 	"encoding/base64"
 	"encoding/hex"
 	"encoding/json"
-	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
 	"path"
 	"strconv"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/authverify"
+
 	"github.com/google/uuid"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@@ -37,7 +38,10 @@ import (
 )
 
 func (t *thirdServer) PartLimit(ctx context.Context, req *third.PartLimitReq) (*third.PartLimitResp, error) {
-	limit := t.s3dataBase.PartLimit()
+	limit, err := t.s3dataBase.PartLimit()
+	if err != nil {
+		return nil, err
+	}
 	return &third.PartLimitResp{
 		MinPartSize: limit.MinPartSize,
 		MaxPartSize: limit.MaxPartSize,
diff --git a/internal/rpc/user/notification.go b/internal/rpc/user/notification.go
index 03fdf95bd..4fb214f74 100644
--- a/internal/rpc/user/notification.go
+++ b/internal/rpc/user/notification.go
@@ -16,6 +16,7 @@ package user
 
 import (
 	"context"
+
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
 	"github.com/openimsdk/protocol/msg"
 
@@ -29,7 +30,7 @@ import (
 )
 
 type UserNotificationSender struct {
-	*rpcclient.NotificationSender
+	*notification.NotificationSender
 	getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error)
 	// db controller
 	db controller.UserDatabase
@@ -63,7 +64,7 @@ func WithUserFunc(
 
 func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender {
 	f := &UserNotificationSender{
-		NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
+		NotificationSender: notification.NewNotificationSender(&config.NotificationConfig, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
 			return msgClient.SendMsg(ctx, req)
 		})),
 	}
diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go
index e2b9f6c8d..12234233b 100644
--- a/pkg/common/config/config.go
+++ b/pkg/common/config/config.go
@@ -18,9 +18,9 @@ import (
 	"strings"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/tools/db/mongoutil"
 	"github.com/openimsdk/tools/db/redisutil"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/s3/aws"
 	"github.com/openimsdk/tools/s3/cos"
 	"github.com/openimsdk/tools/s3/kodo"
diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go
index f7175491c..740d27eac 100644
--- a/pkg/common/discoveryregister/discoveryregister.go
+++ b/pkg/common/discoveryregister/discoveryregister.go
@@ -18,17 +18,22 @@ import (
 	"time"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
-	"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/discovery/etcd"
+	"github.com/openimsdk/tools/discovery/kubernetes"
 	"github.com/openimsdk/tools/errs"
+	"google.golang.org/grpc"
 )
 
 // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
 func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) {
 	switch discovery.Enable {
 	case "k8s":
-		return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
+		return kubernetes.NewKubernetesConnManager("default",
+			grpc.WithDefaultCallOptions(
+				grpc.MaxCallSendMsgSize(1024*1024*20),
+			),
+		)
 	case "etcd":
 		return etcd.NewSvcDiscoveryRegistry(
 			discovery.Etcd.RootDirectory,
diff --git a/pkg/common/discoveryregister/kubernetes/doc.go b/pkg/common/discoveryregister/kubernetes/doc.go
deleted file mode 100644
index 8615caa6b..000000000
--- a/pkg/common/discoveryregister/kubernetes/doc.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright © 2024 OpenIM. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discoveryregister/kubernetes/kubernetes.go
deleted file mode 100644
index f1ce0bbdc..000000000
--- a/pkg/common/discoveryregister/kubernetes/kubernetes.go
+++ /dev/null
@@ -1,199 +0,0 @@
-// Copyright © 2023 OpenIM. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package kubernetes
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"os"
-	"strconv"
-	"strings"
-
-	"github.com/openimsdk/tools/discovery"
-	"github.com/openimsdk/tools/log"
-	"github.com/stathat/consistent"
-	"google.golang.org/grpc"
-)
-
-// K8sDR represents the Kubernetes service discovery and registration client.
-type K8sDR struct {
-	options               []grpc.DialOption
-	rpcRegisterAddr       string
-	gatewayHostConsistent *consistent.Consistent
-	gatewayName           string
-}
-
-func NewK8sDiscoveryRegister(gatewayName string) (discovery.SvcDiscoveryRegistry, error) {
-	gatewayConsistent := consistent.New()
-	gatewayHosts := getMsgGatewayHost(context.Background(), gatewayName)
-	for _, v := range gatewayHosts {
-		gatewayConsistent.Add(v)
-	}
-	return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil
-}
-
-func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
-	if serviceName != cli.gatewayName {
-		cli.rpcRegisterAddr = serviceName
-	} else {
-		cli.rpcRegisterAddr = getSelfHost(context.Background(), cli.gatewayName)
-	}
-
-	return nil
-}
-
-func (cli *K8sDR) UnRegister() error {
-
-	return nil
-}
-
-func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
-
-	return nil
-}
-
-func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
-
-	return nil
-}
-
-func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
-
-	return nil, nil
-}
-
-func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
-	host, err := cli.gatewayHostConsistent.Get(userId)
-	if err != nil {
-		log.ZError(ctx, "GetUserIdHashGatewayHost error", err)
-	}
-	return host, err
-}
-
-func getSelfHost(ctx context.Context, gatewayName string) string {
-	port := 88
-	instance := "openimserver"
-	selfPodName := os.Getenv("MY_POD_NAME")
-	ns := os.Getenv("MY_POD_NAMESPACE")
-	statefuleIndex := 0
-	gatewayEnds := strings.Split(gatewayName, ":")
-	if len(gatewayEnds) != 2 {
-		log.ZError(ctx, "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
-	} else {
-		port, _ = strconv.Atoi(gatewayEnds[1])
-	}
-	podInfo := strings.Split(selfPodName, "-")
-	instance = podInfo[0]
-	count := len(podInfo)
-	statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
-	host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
-	return host
-}
-
-// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88.
-// Replica set in kubernetes environment
-func getMsgGatewayHost(ctx context.Context, gatewayName string) []string {
-	port := 88
-	instance := "openimserver"
-	selfPodName := os.Getenv("MY_POD_NAME")
-	replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
-	ns := os.Getenv("MY_POD_NAMESPACE")
-	gatewayEnds := strings.Split(gatewayName, ":")
-	if len(gatewayEnds) != 2 {
-		log.ZError(ctx, "msggateway RpcRegisterName is error:config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
-	} else {
-		port, _ = strconv.Atoi(gatewayEnds[1])
-	}
-	nReplicas, _ := strconv.Atoi(replicas)
-	podInfo := strings.Split(selfPodName, "-")
-	instance = podInfo[0]
-	var ret []string
-	for i := 0; i < nReplicas; i++ {
-		host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
-		ret = append(ret, host)
-	}
-	log.ZDebug(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
-	return ret
-}
-
-// GetConns returns the gRPC client connections to the specified service.
-func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
-
-	// This conditional checks if the serviceName is not the OpenImMessageGatewayName.
-	// It seems to handle a special case for the OpenImMessageGateway.
-	if serviceName != cli.gatewayName {
-		// DialContext creates a client connection to the given target (serviceName) using the specified context.
-		// 'cli.options' are likely default or common options for all connections in this struct.
-		// 'opts...' allows for additional gRPC dial options to be passed and used.
-		conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
-
-		// The function returns a slice of client connections with the new connection, or an error if occurred.
-		return []*grpc.ClientConn{conn}, err
-	} else {
-		// This block is executed if the serviceName is OpenImMessageGatewayName.
-		// 'ret' will accumulate the connections to return.
-		var ret []*grpc.ClientConn
-
-		// getMsgGatewayHost presumably retrieves hosts for the message gateway service.
-		// The context is passed, likely for cancellation and timeout control.
-		gatewayHosts := getMsgGatewayHost(ctx, cli.gatewayName)
-
-		// Iterating over the retrieved gateway hosts.
-		for _, host := range gatewayHosts {
-			// Establishes a connection to each host.
-			// Again, appending cli.options with any additional opts provided.
-			conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
-
-			// If there's an error while dialing any host, the function returns immediately with the error.
-			if err != nil {
-				return nil, err
-			} else {
-				// If the connection is successful, it is added to the 'ret' slice.
-				ret = append(ret, conn)
-			}
-		}
-		// After all hosts are processed, the slice of connections is returned.
-		return ret, nil
-	}
-}
-
-func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
-
-	return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
-}
-
-func (cli *K8sDR) GetSelfConnTarget() string {
-
-	return cli.rpcRegisterAddr
-}
-
-func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
-	cli.options = append(cli.options, opts...)
-}
-
-func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
-	conn.Close()
-}
-
-// do not use this method for call rpc.
-func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
-	log.ZError(context.Background(), "should not call this function!", nil)
-	return nil
-}
-
-func (cli *K8sDR) Close() {
-
-}
diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go
index 3d4394c51..6b6619914 100644
--- a/pkg/common/startrpc/start.go
+++ b/pkg/common/startrpc/start.go
@@ -167,6 +167,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf
 	}
 
 	err = client.Register(
+		ctx,
 		rpcRegisterName,
 		registerIP,
 		port,
diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go
index b92f9b510..e2df028ec 100644
--- a/pkg/common/storage/controller/msg.go
+++ b/pkg/common/storage/controller/msg.go
@@ -22,6 +22,7 @@ import (
 	"strings"
 	"time"
 
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/tools/utils/jsonutil"
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
@@ -38,7 +39,6 @@ import (
 	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/utils/datautil"
 )
 
diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go
index f4c0c6270..db62ec289 100644
--- a/pkg/common/storage/controller/msg_transfer.go
+++ b/pkg/common/storage/controller/msg_transfer.go
@@ -2,7 +2,9 @@ package controller
 
 import (
 	"context"
+
 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/protocol/constant"
 	"github.com/openimsdk/tools/utils/datautil"
 
@@ -14,7 +16,6 @@ import (
 	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 	"go.mongodb.org/mongo-driver/mongo"
 )
 
diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go
index 91ef126fe..a805eaf00 100644
--- a/pkg/common/storage/controller/push.go
+++ b/pkg/common/storage/controller/push.go
@@ -19,10 +19,10 @@ import (
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/protocol/push"
 	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/log"
-	"github.com/openimsdk/tools/mq/kafka"
 )
 
 type PushDatabase interface {
diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go
index 6693d2dde..30d8d20ec 100644
--- a/pkg/common/storage/controller/s3.go
+++ b/pkg/common/storage/controller/s3.go
@@ -30,7 +30,7 @@ import (
 )
 
 type S3Database interface {
-	PartLimit() *s3.PartLimit
+	PartLimit() (*s3.PartLimit, error)
 	PartSize(ctx context.Context, size int64) (int64, error)
 	AuthSign(ctx context.Context, uploadID string, partNumbers []int) (*s3.AuthSignResult, error)
 	InitiateMultipartUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*cont.InitiateUploadResult, error)
@@ -65,7 +65,7 @@ func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) {
 	return s.s3.PartSize(ctx, size)
 }
 
-func (s *s3Database) PartLimit() *s3.PartLimit {
+func (s *s3Database) PartLimit() (*s3.PartLimit, error) {
 	return s.s3.PartLimit()
 }
 
diff --git a/pkg/common/storage/kafka/config.go b/pkg/common/storage/kafka/config.go
new file mode 100644
index 000000000..1c9c4b0a0
--- /dev/null
+++ b/pkg/common/storage/kafka/config.go
@@ -0,0 +1,33 @@
+// Copyright © 2024 OpenIM open source community. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafka
+
+type TLSConfig struct {
+	EnableTLS          bool   `yaml:"enableTLS"`
+	CACrt              string `yaml:"caCrt"`
+	ClientCrt          string `yaml:"clientCrt"`
+	ClientKey          string `yaml:"clientKey"`
+	ClientKeyPwd       string `yaml:"clientKeyPwd"`
+	InsecureSkipVerify bool   `yaml:"insecureSkipVerify"`
+}
+
+type Config struct {
+	Username     string    `yaml:"username"`
+	Password     string    `yaml:"password"`
+	ProducerAck  string    `yaml:"producerAck"`
+	CompressType string    `yaml:"compressType"`
+	Addr         []string  `yaml:"addr"`
+	TLS          TLSConfig `yaml:"tls"`
+}
diff --git a/pkg/common/storage/kafka/consumer_group.go b/pkg/common/storage/kafka/consumer_group.go
new file mode 100644
index 000000000..f0e84bbc9
--- /dev/null
+++ b/pkg/common/storage/kafka/consumer_group.go
@@ -0,0 +1,68 @@
+// Copyright © 2023 OpenIM. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafka
+
+import (
+	"context"
+	"errors"
+
+	"github.com/IBM/sarama"
+	"github.com/openimsdk/tools/log"
+)
+
+type MConsumerGroup struct {
+	sarama.ConsumerGroup
+	groupID string
+	topics  []string
+}
+
+func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
+	config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
+	if err != nil {
+		return nil, err
+	}
+	group, err := NewConsumerGroup(config, conf.Addr, groupID)
+	if err != nil {
+		return nil, err
+	}
+	return &MConsumerGroup{
+		ConsumerGroup: group,
+		groupID:       groupID,
+		topics:        topics,
+	}, nil
+}
+
+func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
+	return GetContextWithMQHeader(cMsg.Headers)
+}
+
+func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
+	for {
+		err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
+		if errors.Is(err, sarama.ErrClosedConsumerGroup) {
+			return
+		}
+		if errors.Is(err, context.Canceled) {
+			return
+		}
+		if err != nil {
+			log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
+		}
+	}
+}
+
+func (mc *MConsumerGroup) Close() error {
+	return mc.ConsumerGroup.Close()
+}
diff --git a/pkg/common/storage/kafka/producer.go b/pkg/common/storage/kafka/producer.go
new file mode 100644
index 000000000..5f6be29ed
--- /dev/null
+++ b/pkg/common/storage/kafka/producer.go
@@ -0,0 +1,82 @@
+// Copyright © 2023 OpenIM. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafka
+
+import (
+	"context"
+	"github.com/IBM/sarama"
+	"github.com/openimsdk/tools/errs"
+	"google.golang.org/protobuf/proto"
+)
+
+// Producer represents a Kafka producer.
+type Producer struct {
+	addr     []string
+	topic    string
+	config   *sarama.Config
+	producer sarama.SyncProducer
+}
+
+func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
+	producer, err := NewProducer(config, addr)
+	if err != nil {
+		return nil, err
+	}
+	return &Producer{
+		addr:     addr,
+		topic:    topic,
+		config:   config,
+		producer: producer,
+	}, nil
+}
+
+// SendMessage sends a message to the Kafka topic configured in the Producer.
+func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
+	// Marshal the protobuf message
+	bMsg, err := proto.Marshal(msg)
+	if err != nil {
+		return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
+	}
+	if len(bMsg) == 0 {
+		return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
+	}
+
+	// Prepare Kafka message
+	kMsg := &sarama.ProducerMessage{
+		Topic: p.topic,
+		Key:   sarama.StringEncoder(key),
+		Value: sarama.ByteEncoder(bMsg),
+	}
+
+	// Validate message key and value
+	if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
+		return 0, 0, errs.Wrap(errEmptyMsg)
+	}
+
+	// Attach context metadata as headers
+	header, err := GetMQHeaderWithContext(ctx)
+	if err != nil {
+		return 0, 0, err
+	}
+	kMsg.Headers = header
+
+	// Send the message
+	partition, offset, err := p.producer.SendMessage(kMsg)
+	if err != nil {
+		return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error")
+	}
+
+	return partition, offset, nil
+}
diff --git a/pkg/common/storage/kafka/sarama.go b/pkg/common/storage/kafka/sarama.go
new file mode 100644
index 000000000..23220b4d0
--- /dev/null
+++ b/pkg/common/storage/kafka/sarama.go
@@ -0,0 +1,85 @@
+package kafka
+
+import (
+	"bytes"
+	"strings"
+
+	"github.com/IBM/sarama"
+	"github.com/openimsdk/tools/errs"
+)
+
+func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
+	kfk := sarama.NewConfig()
+	kfk.Version = sarama.V2_0_0_0
+	kfk.Consumer.Offsets.Initial = initial
+	kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
+	kfk.Consumer.Return.Errors = false
+	if conf.Username != "" || conf.Password != "" {
+		kfk.Net.SASL.Enable = true
+		kfk.Net.SASL.User = conf.Username
+		kfk.Net.SASL.Password = conf.Password
+	}
+	if conf.TLS.EnableTLS {
+		tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
+		if err != nil {
+			return nil, err
+		}
+		kfk.Net.TLS.Config = tls
+		kfk.Net.TLS.Enable = true
+	}
+	return kfk, nil
+}
+
+func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
+	cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
+	if err != nil {
+		return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
+	}
+	return cg, nil
+}
+
+func BuildProducerConfig(conf Config) (*sarama.Config, error) {
+	kfk := sarama.NewConfig()
+	kfk.Producer.Return.Successes = true
+	kfk.Producer.Return.Errors = true
+	kfk.Producer.Partitioner = sarama.NewHashPartitioner
+	if conf.Username != "" || conf.Password != "" {
+		kfk.Net.SASL.Enable = true
+		kfk.Net.SASL.User = conf.Username
+		kfk.Net.SASL.Password = conf.Password
+	}
+	switch strings.ToLower(conf.ProducerAck) {
+	case "no_response":
+		kfk.Producer.RequiredAcks = sarama.NoResponse
+	case "wait_for_local":
+		kfk.Producer.RequiredAcks = sarama.WaitForLocal
+	case "wait_for_all":
+		kfk.Producer.RequiredAcks = sarama.WaitForAll
+	default:
+		kfk.Producer.RequiredAcks = sarama.WaitForAll
+	}
+	if conf.CompressType == "" {
+		kfk.Producer.Compression = sarama.CompressionNone
+	} else {
+		if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
+			return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
+		}
+	}
+	if conf.TLS.EnableTLS {
+		tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
+		if err != nil {
+			return nil, err
+		}
+		kfk.Net.TLS.Config = tls
+		kfk.Net.TLS.Enable = true
+	}
+	return kfk, nil
+}
+
+func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
+	producer, err := sarama.NewSyncProducer(addr, conf)
+	if err != nil {
+		return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
+	}
+	return producer, nil
+}
diff --git a/pkg/common/storage/kafka/tls.go b/pkg/common/storage/kafka/tls.go
new file mode 100644
index 000000000..00c89dcc1
--- /dev/null
+++ b/pkg/common/storage/kafka/tls.go
@@ -0,0 +1,83 @@
+// Copyright © 2024 OpenIM open source community. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafka
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/pem"
+	"os"
+
+	"github.com/openimsdk/tools/errs"
+)
+
+// decryptPEM decrypts a PEM block using a password.
+func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
+	if len(passphrase) == 0 {
+		return data, nil
+	}
+	b, _ := pem.Decode(data)
+	d, err := x509.DecryptPEMBlock(b, passphrase)
+	if err != nil {
+		return nil, errs.WrapMsg(err, "DecryptPEMBlock failed")
+	}
+	return pem.EncodeToMemory(&pem.Block{
+		Type:  b.Type,
+		Bytes: d,
+	}), nil
+}
+
+func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
+	data, err := os.ReadFile(path)
+	if err != nil {
+		return nil, errs.WrapMsg(err, "ReadFile failed", "path", path)
+	}
+	return decryptPEM(data, pwd)
+}
+
+// newTLSConfig setup the TLS config from general config file.
+func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) {
+	var tlsConfig tls.Config
+	if clientCertFile != "" && clientKeyFile != "" {
+		certPEMBlock, err := os.ReadFile(clientCertFile)
+		if err != nil {
+			return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile)
+		}
+		keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
+		if err != nil {
+			return nil, err
+		}
+
+		cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
+		if err != nil {
+			return nil, errs.WrapMsg(err, "X509KeyPair failed")
+		}
+		tlsConfig.Certificates = []tls.Certificate{cert}
+	}
+
+	if caCertFile != "" {
+		caCert, err := os.ReadFile(caCertFile)
+		if err != nil {
+			return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile)
+		}
+		caCertPool := x509.NewCertPool()
+		if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
+			return nil, errs.New("AppendCertsFromPEM failed")
+		}
+		tlsConfig.RootCAs = caCertPool
+	}
+	tlsConfig.InsecureSkipVerify = insecureSkipVerify
+	return &tlsConfig, nil
+}
diff --git a/pkg/common/storage/kafka/util.go b/pkg/common/storage/kafka/util.go
new file mode 100644
index 000000000..61abe5450
--- /dev/null
+++ b/pkg/common/storage/kafka/util.go
@@ -0,0 +1,34 @@
+package kafka
+
+import (
+	"context"
+	"errors"
+	"github.com/IBM/sarama"
+	"github.com/openimsdk/protocol/constant"
+	"github.com/openimsdk/tools/mcontext"
+)
+
+var errEmptyMsg = errors.New("kafka binary msg is empty")
+
+// GetMQHeaderWithContext extracts message queue headers from the context.
+func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
+	operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return []sarama.RecordHeader{
+		{Key: []byte(constant.OperationID), Value: []byte(operationID)},
+		{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
+		{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
+		{Key: []byte(constant.ConnID), Value: []byte(connID)},
+	}, nil
+}
+
+// GetContextWithMQHeader creates a context from message queue headers.
+func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
+	var values []string
+	for _, recordHeader := range header {
+		values = append(values, string(recordHeader.Value))
+	}
+	return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
+}
diff --git a/pkg/common/storage/kafka/verify.go b/pkg/common/storage/kafka/verify.go
new file mode 100644
index 000000000..0a09eed4e
--- /dev/null
+++ b/pkg/common/storage/kafka/verify.go
@@ -0,0 +1,79 @@
+// Copyright © 2024 OpenIM open source community. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kafka
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/IBM/sarama"
+	"github.com/openimsdk/tools/errs"
+)
+
+func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
+	kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
+	if err != nil {
+		return err
+	}
+	cli, err := sarama.NewClient(conf.Addr, kfk)
+	if err != nil {
+		return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
+	}
+	defer cli.Close()
+
+	existingTopics, err := cli.Topics()
+	if err != nil {
+		return errs.WrapMsg(err, "Failed to list topics")
+	}
+
+	existingTopicsMap := make(map[string]bool)
+	for _, t := range existingTopics {
+		existingTopicsMap[t] = true
+	}
+
+	for _, topic := range topics {
+		if !existingTopicsMap[topic] {
+			return errs.New("topic not exist", "topic", topic).Wrap()
+		}
+	}
+	return nil
+}
+
+func CheckHealth(ctx context.Context, conf *Config) error {
+	kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
+	if err != nil {
+		return err
+	}
+	cli, err := sarama.NewClient(conf.Addr, kfk)
+	if err != nil {
+		return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
+	}
+	defer cli.Close()
+
+	// Get broker list
+	brokers := cli.Brokers()
+	if len(brokers) == 0 {
+		return errs.New("no brokers found").Wrap()
+	}
+
+	// Check if all brokers are reachable
+	for _, broker := range brokers {
+		if err := broker.Open(kfk); err != nil {
+			return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
+		}
+	}
+
+	return nil
+}
diff --git a/pkg/notification/msg.go b/pkg/notification/msg.go
index 8e028c577..ba8a9185a 100644
--- a/pkg/notification/msg.go
+++ b/pkg/notification/msg.go
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package rpcclient
+package notification
 
 import (
 	"context"
@@ -197,7 +197,6 @@ func WithSendMessage(sendMessage *bool) NotificationOptions {
 }
 
 func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) {
-	//ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)})
 	ctx = context.WithoutCancel(ctx)
 	ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5))
 	defer cancel()
@@ -214,7 +213,7 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co
 	var req msg.SendMsgReq
 	var msg sdkws.MsgData
 	var userInfo *sdkws.UserInfo
-	if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil {
+	if notificationOpt.RpcGetUsername && s.getUserInfo != nil {
 		userInfo, err = s.getUserInfo(ctx, sendID)
 		if err != nil {
 			log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID)
diff --git a/tools/check-component/main.go b/tools/check-component/main.go
index 94dbd613c..cb1fb2a33 100644
--- a/tools/check-component/main.go
+++ b/tools/check-component/main.go
@@ -26,11 +26,11 @@ import (
 
 	"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
 	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
 	"github.com/openimsdk/tools/db/mongoutil"
 	"github.com/openimsdk/tools/db/redisutil"
 	"github.com/openimsdk/tools/discovery/etcd"
 	"github.com/openimsdk/tools/discovery/zookeeper"
-	"github.com/openimsdk/tools/mq/kafka"
 	"github.com/openimsdk/tools/s3/minio"
 	"github.com/openimsdk/tools/system/program"
 )
diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go
index c9f3a8236..8a4d874bb 100644
--- a/tools/seq/internal/main.go
+++ b/tools/seq/internal/main.go
@@ -44,7 +44,7 @@ const (
 )
 
 func readConfig[T any](dir string, name string) (*T, error) {
-	if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES {
+	if runtimeenv.RuntimeEnvironment() == config.KUBERNETES {
 		dir = os.Getenv(config.MountConfigFilePath)
 	}
 	v := viper.New()