diff --git a/config/config.yaml b/config/config.yaml index a3e836a15..85c5bc9f1 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -47,6 +47,9 @@ kafka: ws2mschatoffline: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat_offline" + msgtomongo: + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 + topic: "msg_to_mongo" ms2pschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" diff --git a/deploy_k8s/admin_cms/admin_cms.Dockerfile b/deploy_k8s/admin_cms/admin_cms.Dockerfile index 1144ef09d..a7b33da70 100644 --- a/deploy_k8s/admin_cms/admin_cms.Dockerfile +++ b/deploy_k8s/admin_cms/admin_cms.Dockerfile @@ -2,6 +2,7 @@ FROM ubuntu # 设置固定的项目路径 ENV WORKDIR /Open-IM-Server +ENV CMDDIR $WORKDIR/cmd ENV CONFIG_NAME $WORKDIR/config/config.yaml @@ -15,5 +16,5 @@ RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \ VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config", "/Open-IM-Server/script"] -WORKDIR $WORKDIR -CMD ./cmd/main +WORKDIR $CMDDIR +CMD ./main diff --git a/deploy_k8s/api/api.Dockerfile b/deploy_k8s/api/api.Dockerfile index f1dd4c748..644621d5f 100644 --- a/deploy_k8s/api/api.Dockerfile +++ b/deploy_k8s/api/api.Dockerfile @@ -2,6 +2,7 @@ FROM ubuntu # 设置固定的项目路径 ENV WORKDIR /Open-IM-Server +ENV CMDDIR $WORKDIR/cmd ENV CONFIG_NAME $WORKDIR/config/config.yaml # 将可执行文件复制到目标目录 @@ -14,5 +15,5 @@ RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \ VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"] -WORKDIR $WORKDIR -CMD ./cmd/main +WORKDIR $CMDDIR +CMD ./main diff --git a/deploy_k8s/api/deployment.yaml b/deploy_k8s/api/deployment.yaml index 52990ca2f..ee9d0c8a7 100644 --- a/deploy_k8s/api/deployment.yaml +++ b/deploy_k8s/api/deployment.yaml @@ -16,7 +16,7 @@ spec: containers: - name: api image: openim/api:v2.0.10k - # imagePullPolicy: Always + imagePullPolicy: Always ports: - containerPort: 10002 volumeMounts: @@ -42,3 +42,4 @@ spec: targetPort: 10002 selector: app: api + type: NodePort \ No newline at end of file diff --git a/deploy_k8s/cms_api/deployment.yaml b/deploy_k8s/cms_api/deployment.yaml index 4dd0626ac..9f66431d2 100644 --- a/deploy_k8s/cms_api/deployment.yaml +++ b/deploy_k8s/cms_api/deployment.yaml @@ -41,4 +41,5 @@ spec: port: 10006 targetPort: 10006 selector: - app: cms-api \ No newline at end of file + app: cms-api + type: NodePort \ No newline at end of file diff --git a/deploy_k8s/demo/deployment.yaml b/deploy_k8s/demo/deployment.yaml index d12f902b9..228cc8278 100644 --- a/deploy_k8s/demo/deployment.yaml +++ b/deploy_k8s/demo/deployment.yaml @@ -41,4 +41,5 @@ spec: port: 10004 targetPort: 10004 selector: - app: demo \ No newline at end of file + app: demo + type: NodePort \ No newline at end of file diff --git a/deploy_k8s/ingress.yaml b/deploy_k8s/ingress.yaml index f2d247d0f..6d7fbda6d 100644 --- a/deploy_k8s/ingress.yaml +++ b/deploy_k8s/ingress.yaml @@ -77,7 +77,7 @@ spec: service: name: demo port: - number: 10004 + number: 10004 path: / pathType: Prefix --- diff --git a/deploy_k8s/msg_gateway/deployment.yaml b/deploy_k8s/msg_gateway/deployment.yaml index 0eec0f5f5..72352386e 100644 --- a/deploy_k8s/msg_gateway/deployment.yaml +++ b/deploy_k8s/msg_gateway/deployment.yaml @@ -45,5 +45,6 @@ spec: targetPort: ws-port selector: app: msg-gateway + type: NodePort \ No newline at end of file diff --git a/deploy_k8s/sdk_server/deployment.yaml b/deploy_k8s/sdk_server/deployment.yaml index 57cb0c412..063b3d9d9 100644 --- a/deploy_k8s/sdk_server/deployment.yaml +++ b/deploy_k8s/sdk_server/deployment.yaml @@ -49,5 +49,6 @@ spec: targetPort: 10003 selector: app: sdk-server + type: NodePort \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 4a419ae1b..e1d2a98b2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -75,7 +75,7 @@ services: TZ: Asia/Shanghai KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:2181 - KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1" + KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1,msg_to_mongo:2:1" KAFKA_ADVERTISED_LISTENERS: INSIDE://127.0.0.1:9092,OUTSIDE://103.116.45.174:9093 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT" diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 8829906e4..65457878c 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -22,6 +22,7 @@ var ( persistentCH PersistentConsumerHandler historyCH OnlineHistoryConsumerHandler producer *kafka.Producer + producerToMongo *kafka.Producer cmdCh chan Cmd2Value onlineTopicStatus int w *sync.Mutex @@ -43,6 +44,7 @@ func Init() { statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) + producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic) } func Run() { //register mysqlConsumerHandler to diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index c5e85b4e4..a1544549d 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -155,11 +155,17 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - hashCode := getHashCode(aggregationID) - channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) - //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} + pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) + if err != nil { + log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) + } else { + // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) + } + //hashCode := getHashCode(aggregationID) + //channelID := hashCode % ChannelNum + //log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) + ////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + //och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} } func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { for { diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 450f57fb0..f82c484bb 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -71,6 +71,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil { log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData) + return } //Use offline push messaging var UIDList []string @@ -138,9 +139,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } else { log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) } - } - } func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 3dc98c3a7..6b6f33198 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -462,13 +462,29 @@ func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, } func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool { - conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) - opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) - if err != nil && err != go_redis.Nil { - log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", conversationID, pb.String(), err.Error()) - return true + opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID) + if err != nil { + log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error()) + } switch opt { + case constant.ReceiveMessage: + case constant.NotReceiveMessage: + return false + case constant.ReceiveNotNotifyMessage: + if pb.MsgData.Options == nil { + pb.MsgData.Options = make(map[string]bool, 10) + } + utils.SetSwitchFromOptions(pb.MsgData.Options, constant.IsOfflinePush, false) + return true + } + conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) + singleOpt, sErr := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) + if sErr != nil && sErr != go_redis.Nil { + log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", conversationID, pb.String(), sErr.Error()) + return true + } + switch singleOpt { case constant.ReceiveMessage: return true case constant.NotReceiveMessage: diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 0d7a2fc82..7c274bddc 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -391,11 +391,17 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetUserGlobalMsgRecvOpt failed ", err.Error(), user) return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil } - } - err := imdb.UpdateUserInfo(user, m) - if err != nil { - log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user) - return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil + err = imdb.UpdateUserInfoByMap(user, m) + if err != nil { + log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user) + return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil + } + } else { + err := imdb.UpdateUserInfo(user) + if err != nil { + log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user) + return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil + } } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName, req.OperationID) if etcdConn == nil { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index dff8a693e..0618ca7d7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -203,6 +203,10 @@ type config struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } + MsgToMongo struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } Ms2pschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` diff --git a/pkg/common/db/model.go b/pkg/common/db/model.go index 2418e7658..a6ed0408a 100644 --- a/pkg/common/db/model.go +++ b/pkg/common/db/model.go @@ -120,8 +120,8 @@ func init() { }) //DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ // Addrs: []string{config.Config.Redis.DBAddress}, - // PoolSize: 100, - // Password: config.Config.Redis.DBPassWord, + // PoolSize: 50, + //Password: config.Config.Redis.DBPassWord, //}) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_model.go b/pkg/common/db/mysql_model/im_mysql_model/user_model.go index e58ed8327..8882f6931 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/user_model.go @@ -96,16 +96,24 @@ func GetUserNameByUserID(userID string) (string, error) { return user.Nickname, nil } -func UpdateUserInfo(user db.User, m ...map[string]interface{}) error { +func UpdateUserInfo(user db.User) error { dbConn, err := db.DB.MysqlDB.DefaultGormDB() if err != nil { return err } dbConn.LogMode(false) err = dbConn.Table("users").Where("user_id=?", user.UserID).Update(&user).Error - if len(m) > 0 { - err = dbConn.Table("users").Where("user_id=?", user.UserID).Updates(m[0]).Error + + return err +} + +func UpdateUserInfoByMap(user db.User, m map[string]interface{}) error { + dbConn, err := db.DB.MysqlDB.DefaultGormDB() + if err != nil { + return err } + dbConn.LogMode(false) + err = dbConn.Table("users").Where("user_id=?", user.UserID).Updates(m).Error return err }