diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go
index e8cf2e799..8b3818aa8 100644
--- a/cmd/open_im_api/main.go
+++ b/cmd/open_im_api/main.go
@@ -38,7 +38,8 @@ func main() {
 	// user routing group, which handles user registration and login services
 	userRouterGroup := r.Group("/user")
 	{
-		userRouterGroup.POST("/update_user_info", user.UpdateUserInfo)              //1
+		userRouterGroup.POST("/update_user_info", user.UpdateUserInfo) //1
+		userRouterGroup.POST("/set_global_msg_recv_opt", user.SetGlobalRecvMessageOpt)
 		userRouterGroup.POST("/get_users_info", user.GetUsersInfo)                  //1
 		userRouterGroup.POST("/get_self_user_info", user.GetSelfUserInfo)           //1
 		userRouterGroup.POST("/get_users_online_status", user.GetUsersOnlineStatus) //1
diff --git a/config/config.yaml b/config/config.yaml
index 4d9569d07..3143f3a0b 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -55,8 +55,8 @@ kafka:
     addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可
     topic: "ms2ps_chat"
   consumergroupid:
+    msgToRedis: redis
     msgToMongo: mongo
-    msgToMongoOffline: mongo_offline
     msgToMySql: mysql
     msgToPush: push
 
diff --git a/internal/api/user/user.go b/internal/api/user/user.go
index af93416b4..ac40059d3 100644
--- a/internal/api/user/user.go
+++ b/internal/api/user/user.go
@@ -233,6 +233,44 @@ func UpdateUserInfo(c *gin.Context) {
 	log.NewInfo(req.OperationID, "UpdateUserInfo api return ", resp)
 	c.JSON(http.StatusOK, resp)
 }
+func SetGlobalRecvMessageOpt(c *gin.Context) {
+	params := api.SetGlobalRecvMessageOptReq{}
+	if err := c.BindJSON(&params); err != nil {
+		log.NewError("0", "BindJSON failed ", err.Error())
+		c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
+		return
+	}
+	req := &rpc.SetGlobalRecvMessageOptReq{}
+	utils.CopyStructFields(req, &params)
+	req.OperationID = params.OperationID
+	var ok bool
+	var errInfo string
+	ok, req.UserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
+	if !ok {
+		errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
+		log.NewError(req.OperationID, errMsg)
+		c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
+		return
+	}
+	log.NewInfo(params.OperationID, "SetGlobalRecvMessageOpt args ", req.String())
+	etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName, req.OperationID)
+	if etcdConn == nil {
+		errMsg := req.OperationID + "getcdv3.GetConn == nil"
+		log.NewError(req.OperationID, errMsg)
+		c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
+		return
+	}
+	client := rpc.NewUserClient(etcdConn)
+	RpcResp, err := client.SetGlobalRecvMessageOpt(context.Background(), req)
+	if err != nil {
+		log.NewError(req.OperationID, "SetGlobalRecvMessageOpt failed ", err.Error(), req.String())
+		c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call  rpc server failed"})
+		return
+	}
+	resp := api.UpdateUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}}
+	log.NewInfo(req.OperationID, "SetGlobalRecvMessageOpt api return ", resp)
+	c.JSON(http.StatusOK, resp)
+}
 
 func GetSelfUserInfo(c *gin.Context) {
 	params := api.GetSelfUserInfoReq{}
diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go
index 65457878c..168bfe785 100644
--- a/internal/msg_transfer/logic/init.go
+++ b/internal/msg_transfer/logic/init.go
@@ -4,7 +4,6 @@ import (
 	"Open_IM/pkg/common/config"
 	"Open_IM/pkg/common/constant"
 	"Open_IM/pkg/common/kafka"
-	"Open_IM/pkg/common/log"
 	"Open_IM/pkg/statistics"
 	"fmt"
 	"sync"
@@ -20,7 +19,8 @@ const ChannelNum = 100
 
 var (
 	persistentCH          PersistentConsumerHandler
-	historyCH             OnlineHistoryConsumerHandler
+	historyCH             OnlineHistoryRedisConsumerHandler
+	historyMongoCH        OnlineHistoryMongoConsumerHandler
 	producer              *kafka.Producer
 	producerToMongo       *kafka.Producer
 	cmdCh                 chan Cmd2Value
@@ -39,7 +39,6 @@ func Init() {
 	persistentCH.Init()
 	historyCH.Init(cmdCh)
 	onlineTopicStatus = OnlineTopicVacancy
-	log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic)
 	//offlineHistoryCH.Init(cmdCh)
 	statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
 	statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
@@ -54,6 +53,7 @@ func Run() {
 		fmt.Println("not start mysql consumer")
 	}
 	go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
+	//go historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyMongoCH)
 	//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
 }
 func SetOnlineTopicStatus(status int) {
diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go
index a1544549d..908d28b27 100644
--- a/internal/msg_transfer/logic/online_history_msg_handler.go
+++ b/internal/msg_transfer/logic/online_history_msg_handler.go
@@ -9,10 +9,8 @@ import (
 	"Open_IM/pkg/grpc-etcdv3/getcdv3"
 	pbMsg "Open_IM/pkg/proto/chat"
 	pbPush "Open_IM/pkg/proto/push"
-	server_api_params "Open_IM/pkg/proto/sdk_ws"
 	"Open_IM/pkg/utils"
 	"context"
-	"errors"
 	"github.com/Shopify/sarama"
 	"github.com/golang/protobuf/proto"
 	"hash/crc32"
@@ -36,28 +34,21 @@ type Cmd2Value struct {
 	Cmd   int
 	Value interface{}
 }
-type OnlineHistoryConsumerHandler struct {
+type OnlineHistoryRedisConsumerHandler struct {
 	msgHandle            map[string]fcb
 	historyConsumerGroup *kfk.MConsumerGroup
-	cmdCh                chan Cmd2Value
 	chArrays             [ChannelNum]chan Cmd2Value
-	chMongoArrays        [ChannelNum]chan Cmd2Value
 	msgDistributionCh    chan Cmd2Value
 }
 
-func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
+func (och *OnlineHistoryRedisConsumerHandler) Init(cmdCh chan Cmd2Value) {
 	och.msgHandle = make(map[string]fcb)
 	och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
 	go och.MessagesDistributionHandle()
-	och.cmdCh = cmdCh
 	for i := 0; i < ChannelNum; i++ {
 		och.chArrays[i] = make(chan Cmd2Value, 50)
 		go och.Run(i)
 	}
-	for i := 0; i < ChannelNum; i++ {
-		och.chMongoArrays[i] = make(chan Cmd2Value, 10000)
-		go och.MongoMessageRun(i)
-	}
 	if config.Config.ReliableStorage {
 		och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo
 	} else {
@@ -66,34 +57,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) {
 	}
 	och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
 		OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
-		config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
+		config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
 
 }
-func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) {
-	operationID := utils.OperationIDGenerator()
-	err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1)
-	if err != nil {
-		log.Error(operationID, "TriggerCmd failed ", err.Error(), status)
-		return
-	}
-	log.Debug(operationID, "TriggerCmd success", status)
-
-}
-func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error {
-	var flag = 0
-	select {
-	case ch <- value:
-		flag = 1
-	case <-time.After(time.Second * time.Duration(timeout)):
-		flag = 2
-	}
-	if flag == 1 {
-		return nil
-	} else {
-		return errors.New("send cmd timeout")
-	}
-}
-func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
+func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
 	for {
 		select {
 		case cmd := <-och.chArrays[channelID]:
@@ -154,7 +121,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
 		}
 	}
 }
-func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
+func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
 	pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
 	if err != nil {
 		log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
@@ -167,47 +134,48 @@ func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID stri
 	////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
 	//och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
 }
-func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
-	for {
-		select {
-		case cmd := <-och.chMongoArrays[channelID]:
-			switch cmd.Cmd {
-			case MongoMessages:
-				msgChannelValue := cmd.Value.(MsgChannelValue)
-				msgList := msgChannelValue.msgList
-				triggerID := msgChannelValue.triggerID
-				aggregationID := msgChannelValue.aggregationID
-				lastSeq := msgChannelValue.lastSeq
-				err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
-				if err != nil {
-					log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
-				}
-				for _, v := range msgList {
-					if v.MsgData.ContentType == constant.DeleteMessageNotification {
-						tips := server_api_params.TipsComm{}
-						DeleteMessageTips := server_api_params.DeleteMessageTips{}
-						err := proto.Unmarshal(v.MsgData.Content, &tips)
-						if err != nil {
-							log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
-							continue
-						}
-						err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
-						if err != nil {
-							log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
-							continue
-						}
-						if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
-							log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
-						}
 
-					}
-				}
-			}
-		}
-	}
-}
+//func (och *OnlineHistoryRedisConsumerHandler) MongoMessageRun(channelID int) {
+//	for {
+//		select {
+//		case cmd := <-och.chMongoArrays[channelID]:
+//			switch cmd.Cmd {
+//			case MongoMessages:
+//				msgChannelValue := cmd.Value.(MsgChannelValue)
+//				msgList := msgChannelValue.msgList
+//				triggerID := msgChannelValue.triggerID
+//				aggregationID := msgChannelValue.aggregationID
+//				lastSeq := msgChannelValue.lastSeq
+//				err := db.DB.BatchInsertChat2DB(aggregationID, msgList, triggerID, lastSeq)
+//				if err != nil {
+//					log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
+//				}
+//				for _, v := range msgList {
+//					if v.MsgData.ContentType == constant.DeleteMessageNotification {
+//						tips := server_api_params.TipsComm{}
+//						DeleteMessageTips := server_api_params.DeleteMessageTips{}
+//						err := proto.Unmarshal(v.MsgData.Content, &tips)
+//						if err != nil {
+//							log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
+//							continue
+//						}
+//						err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
+//						if err != nil {
+//							log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
+//							continue
+//						}
+//						if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
+//							log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
+//						}
+//
+//					}
+//				}
+//			}
+//		}
+//	}
+//}
 
-func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
+func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
 	for {
 		aggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)
 		select {
@@ -253,7 +221,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() {
 	}
 
 }
-func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
+func (mc *OnlineHistoryRedisConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
 	msg := cMsg.Value
 	now := time.Now()
 	msgFromMQ := pbMsg.MsgDataToMQ{}
@@ -325,7 +293,7 @@ func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Consumer
 	log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String())
 }
 
-func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
+func (och *OnlineHistoryRedisConsumerHandler) handleChatWs2MongoLowReliability(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) {
 	msg := cMsg.Value
 	msgFromMQ := pbMsg.MsgDataToMQ{}
 	err := proto.Unmarshal(msg, &msgFromMQ)
@@ -365,10 +333,10 @@ func (och *OnlineHistoryConsumerHandler) handleChatWs2MongoLowReliability(cMsg *
 	}
 }
 
-func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
-func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
+func (OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
+func (OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
 
-//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
+//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
 //	claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
 //	log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
 //	for msg := range claim.Messages() {
@@ -385,7 +353,7 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error
 //	return nil
 //}
 
-func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
+func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
 	claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
 
 	for {
@@ -480,7 +448,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS
 	return nil
 }
 
-//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
+//func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
 //	claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
 //
 //	for {
diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go
new file mode 100644
index 000000000..1d71b8a1c
--- /dev/null
+++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go
@@ -0,0 +1,79 @@
+package logic
+
+import (
+	"Open_IM/pkg/common/config"
+	"Open_IM/pkg/common/constant"
+	"Open_IM/pkg/common/db"
+	kfk "Open_IM/pkg/common/kafka"
+	"Open_IM/pkg/common/log"
+	pbMsg "Open_IM/pkg/proto/chat"
+	server_api_params "Open_IM/pkg/proto/sdk_ws"
+	"Open_IM/pkg/utils"
+	"github.com/Shopify/sarama"
+	"github.com/golang/protobuf/proto"
+)
+
+type OnlineHistoryMongoConsumerHandler struct {
+	msgHandle            map[string]fcb
+	historyConsumerGroup *kfk.MConsumerGroup
+}
+
+func (och *OnlineHistoryMongoConsumerHandler) Init(cmdCh chan Cmd2Value) {
+	och.msgHandle = make(map[string]fcb)
+	och.msgHandle[config.Config.Kafka.MsgToMongo.Topic] = och.handleChatWs2Mongo
+	och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
+		OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
+		config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
+
+}
+func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
+	msg := cMsg.Value
+	msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
+	err := proto.Unmarshal(msg, &msgFromMQ)
+	if err != nil {
+		log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
+		return
+	}
+	err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
+	if err != nil {
+		log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList)
+	}
+	for _, v := range msgFromMQ.MessageList {
+		if v.MsgData.ContentType == constant.DeleteMessageNotification {
+			tips := server_api_params.TipsComm{}
+			DeleteMessageTips := server_api_params.DeleteMessageTips{}
+			err := proto.Unmarshal(v.MsgData.Content, &tips)
+			if err != nil {
+				log.NewError(msgFromMQ.TriggerID, "tips unmarshal err:", err.Error(), v.String())
+				continue
+			}
+			err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
+			if err != nil {
+				log.NewError(msgFromMQ.TriggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
+				continue
+			}
+			if unexistSeqList, err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
+				log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error(), unexistSeqList)
+			}
+
+		}
+	}
+}
+
+func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
+func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
+
+func (och *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
+	claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
+	log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
+	for msg := range claim.Messages() {
+		log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
+		if len(msg.Value) != 0 {
+			och.msgHandle[msg.Topic](msg, string(msg.Key), sess)
+		} else {
+			log.Error("", "mongo msg get from kafka but is nil", msg.Key)
+		}
+		sess.MarkMessage(msg, "")
+	}
+	return nil
+}
diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go
index 86bd07c5b..fa733cbe4 100644
--- a/internal/rpc/user/user.go
+++ b/internal/rpc/user/user.go
@@ -433,7 +433,28 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI
 	//}
 	return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{}}, nil
 }
+func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.SetGlobalRecvMessageOptReq) (*pbUser.SetGlobalRecvMessageOptResp, error) {
+	log.NewInfo(req.OperationID, "SetGlobalRecvMessageOpt args ", req.String())
 
+	var user db.User
+	user.UserID = req.UserID
+	m := make(map[string]interface{}, 1)
+
+	log.NewDebug(req.OperationID, utils.GetSelfFuncName(), req.GlobalRecvMsgOpt, "set GlobalRecvMsgOpt")
+	m["global_recv_msg_opt"] = req.GlobalRecvMsgOpt
+	err := db.DB.SetUserGlobalMsgRecvOpt(user.UserID, req.GlobalRecvMsgOpt)
+	if err != nil {
+		log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetGlobalRecvMessageOpt failed ", err.Error(), user)
+		return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
+	}
+	err = imdb.UpdateUserInfoByMap(user, m)
+	if err != nil {
+		log.NewError(req.OperationID, "SetGlobalRecvMessageOpt failed ", err.Error(), user)
+		return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
+	}
+	chat.UserInfoUpdatedNotification(req.OperationID, req.UserID, req.UserID)
+	return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{}}, nil
+}
 func (s *userServer) SyncJoinedGroupMemberFaceURL(userID string, faceURL string, operationID string, opUserID string) {
 	joinedGroupIDList, err := imdb.GetJoinedGroupIDListByUserID(userID)
 	if err != nil {
diff --git a/pkg/base_info/user_api_struct.go b/pkg/base_info/user_api_struct.go
index b517f24de..45c8cb22d 100644
--- a/pkg/base_info/user_api_struct.go
+++ b/pkg/base_info/user_api_struct.go
@@ -18,7 +18,13 @@ type UpdateSelfUserInfoReq struct {
 	ApiUserInfo
 	OperationID string `json:"operationID" binding:"required"`
 }
-
+type SetGlobalRecvMessageOptReq struct {
+	OperationID      string `json:"operationID" binding:"required"`
+	GlobalRecvMsgOpt *int32 `json:"globalRecvMsgOpt" binding:"omitempty,oneof=0 1 2"`
+}
+type SetGlobalRecvMessageOptResp struct {
+	CommResp
+}
 type UpdateUserInfoResp struct {
 	CommResp
 }
diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go
index 185236744..de0d1597e 100644
--- a/pkg/common/config/config.go
+++ b/pkg/common/config/config.go
@@ -213,10 +213,10 @@ type config struct {
 			Topic string   `yaml:"topic"`
 		}
 		ConsumerGroupID struct {
-			MsgToMongo        string `yaml:"msgToMongo"`
-			MsgToMongoOffline string `yaml:"msgToMongoOffline"`
-			MsgToMySql        string `yaml:"msgToMySql"`
-			MsgToPush         string `yaml:"msgToPush"`
+			MsgToRedis string `yaml:"msgToRedis"`
+			MsgToMongo string `yaml:"msgToMongo"`
+			MsgToMySql string `yaml:"msgToMySql"`
+			MsgToPush  string `yaml:"msgToPush"`
 		}
 	}
 	Secret                            string `yaml:"secret"`
diff --git a/script/start_all.sh b/script/start_all.sh
index bf7170d68..9fcbd1c6a 100644
--- a/script/start_all.sh
+++ b/script/start_all.sh
@@ -5,10 +5,10 @@
 #fixme Put the shell script name here
 need_to_start_server_shell=(
   start_rpc_service.sh
-  msg_gateway_start.sh
   push_start.sh
   msg_transfer_start.sh
   sdk_svr_start.sh
+  msg_gateway_start.sh
   demo_svr_start.sh
 )
 time=`date +"%Y-%m-%d %H:%M:%S"`