Merge remote-tracking branch 'origin/superGroup' into superGroup

# Conflicts:
#	internal/rpc/user/user.go
This commit is contained in:
skiffer-git 2022-06-16 14:10:56 +08:00
commit 8c82f811cb
18 changed files with 82 additions and 31 deletions

View File

@ -47,6 +47,9 @@ kafka:
ws2mschatoffline: ws2mschatoffline:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可 addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ws2ms_chat_offline" topic: "ws2ms_chat_offline"
msgtomongo:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "msg_to_mongo"
ms2pschat: ms2pschat:
addr: [ 127.0.0.1:9092 ] #kafka配置默认即可 addr: [ 127.0.0.1:9092 ] #kafka配置默认即可
topic: "ms2ps_chat" topic: "ms2ps_chat"

View File

@ -2,6 +2,7 @@ FROM ubuntu
# 设置固定的项目路径 # 设置固定的项目路径
ENV WORKDIR /Open-IM-Server ENV WORKDIR /Open-IM-Server
ENV CMDDIR $WORKDIR/cmd
ENV CONFIG_NAME $WORKDIR/config/config.yaml ENV CONFIG_NAME $WORKDIR/config/config.yaml
@ -15,5 +16,5 @@ RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \
VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config", "/Open-IM-Server/script"] VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config", "/Open-IM-Server/script"]
WORKDIR $WORKDIR WORKDIR $CMDDIR
CMD ./cmd/main CMD ./main

View File

@ -2,6 +2,7 @@ FROM ubuntu
# 设置固定的项目路径 # 设置固定的项目路径
ENV WORKDIR /Open-IM-Server ENV WORKDIR /Open-IM-Server
ENV CMDDIR $WORKDIR/cmd
ENV CONFIG_NAME $WORKDIR/config/config.yaml ENV CONFIG_NAME $WORKDIR/config/config.yaml
# 将可执行文件复制到目标目录 # 将可执行文件复制到目标目录
@ -14,5 +15,5 @@ RUN mkdir $WORKDIR/logs $WORKDIR/config $WORKDIR/script && \
VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"] VOLUME ["/Open-IM-Server/logs","/Open-IM-Server/config","/Open-IM-Server/script"]
WORKDIR $WORKDIR WORKDIR $CMDDIR
CMD ./cmd/main CMD ./main

View File

@ -16,7 +16,7 @@ spec:
containers: containers:
- name: api - name: api
image: openim/api:v2.0.10k image: openim/api:v2.0.10k
# imagePullPolicy: Always imagePullPolicy: Always
ports: ports:
- containerPort: 10002 - containerPort: 10002
volumeMounts: volumeMounts:
@ -42,3 +42,4 @@ spec:
targetPort: 10002 targetPort: 10002
selector: selector:
app: api app: api
type: NodePort

View File

@ -42,3 +42,4 @@ spec:
targetPort: 10006 targetPort: 10006
selector: selector:
app: cms-api app: cms-api
type: NodePort

View File

@ -42,3 +42,4 @@ spec:
targetPort: 10004 targetPort: 10004
selector: selector:
app: demo app: demo
type: NodePort

View File

@ -45,5 +45,6 @@ spec:
targetPort: ws-port targetPort: ws-port
selector: selector:
app: msg-gateway app: msg-gateway
type: NodePort

View File

@ -49,5 +49,6 @@ spec:
targetPort: 10003 targetPort: 10003
selector: selector:
app: sdk-server app: sdk-server
type: NodePort

View File

@ -75,7 +75,7 @@ services:
TZ: Asia/Shanghai TZ: Asia/Shanghai
KAFKA_BROKER_ID: 0 KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:2181 KAFKA_ZOOKEEPER_CONNECT: 127.0.0.1:2181
KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1" KAFKA_CREATE_TOPICS: "ws2ms_chat:2:1,ms2ps_chat:2:1,msg_to_mongo:2:1"
KAFKA_ADVERTISED_LISTENERS: INSIDE://127.0.0.1:9092,OUTSIDE://103.116.45.174:9093 KAFKA_ADVERTISED_LISTENERS: INSIDE://127.0.0.1:9092,OUTSIDE://103.116.45.174:9093
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"

View File

@ -22,6 +22,7 @@ var (
persistentCH PersistentConsumerHandler persistentCH PersistentConsumerHandler
historyCH OnlineHistoryConsumerHandler historyCH OnlineHistoryConsumerHandler
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer
cmdCh chan Cmd2Value cmdCh chan Cmd2Value
onlineTopicStatus int onlineTopicStatus int
w *sync.Mutex w *sync.Mutex
@ -43,6 +44,7 @@ func Init() {
statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
producerToMongo = kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic)
} }
func Run() { func Run() {
//register mysqlConsumerHandler to //register mysqlConsumerHandler to

View File

@ -155,11 +155,17 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
} }
} }
func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) {
hashCode := getHashCode(aggregationID) pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID)
channelID := hashCode % ChannelNum if err != nil {
log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID) log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
//go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { } else {
och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}} // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
}
//hashCode := getHashCode(aggregationID)
//channelID := hashCode % ChannelNum
//log.Debug(triggerID, "generate channelID", hashCode, channelID, aggregationID)
////go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) {
//och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{aggregationID: aggregationID, msgList: messages, triggerID: triggerID, lastSeq: lastSeq}}
} }
func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
for { for {

View File

@ -71,6 +71,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
} }
if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil { if err := db.DB.HandleSignalInfo(pushMsg.OperationID, pushMsg.MsgData); err != nil {
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData) log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), err.Error(), pushMsg.MsgData)
return
} }
//Use offline push messaging //Use offline push messaging
var UIDList []string var UIDList []string
@ -138,9 +139,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
} else { } else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData) log.NewDebug(pushMsg.OperationID, "offline push return result is ", pushResult, pushMsg.MsgData)
} }
} }
} }
func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {

View File

@ -462,13 +462,29 @@ func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32,
} }
func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool { func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool {
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) opt, err := db.DB.GetUserGlobalMsgRecvOpt(userID)
opt, err := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID) if err != nil {
if err != nil && err != go_redis.Nil { log.NewError(pb.OperationID, "GetUserGlobalMsgRecvOpt from redis err", userID, pb.String(), err.Error())
log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", conversationID, pb.String(), err.Error())
return true
} }
switch opt { switch opt {
case constant.ReceiveMessage:
case constant.NotReceiveMessage:
return false
case constant.ReceiveNotNotifyMessage:
if pb.MsgData.Options == nil {
pb.MsgData.Options = make(map[string]bool, 10)
}
utils.SetSwitchFromOptions(pb.MsgData.Options, constant.IsOfflinePush, false)
return true
}
conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType)
singleOpt, sErr := db.DB.GetSingleConversationRecvMsgOpt(userID, conversationID)
if sErr != nil && sErr != go_redis.Nil {
log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", conversationID, pb.String(), sErr.Error())
return true
}
switch singleOpt {
case constant.ReceiveMessage: case constant.ReceiveMessage:
return true return true
case constant.NotReceiveMessage: case constant.NotReceiveMessage:

View File

@ -391,12 +391,18 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetUserGlobalMsgRecvOpt failed ", err.Error(), user) log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetUserGlobalMsgRecvOpt failed ", err.Error(), user)
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
} }
} err = imdb.UpdateUserInfoByMap(user, m)
err := imdb.UpdateUserInfo(user, m)
if err != nil { if err != nil {
log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user) log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user)
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
} }
} else {
err := imdb.UpdateUserInfo(user)
if err != nil {
log.NewError(req.OperationID, "UpdateUserInfo failed ", err.Error(), user)
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName, req.OperationID) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName, req.OperationID)
if etcdConn == nil { if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil" errMsg := req.OperationID + "getcdv3.GetConn == nil"

View File

@ -203,6 +203,10 @@ type config struct {
Addr []string `yaml:"addr"` Addr []string `yaml:"addr"`
Topic string `yaml:"topic"` Topic string `yaml:"topic"`
} }
MsgToMongo struct {
Addr []string `yaml:"addr"`
Topic string `yaml:"topic"`
}
Ms2pschat struct { Ms2pschat struct {
Addr []string `yaml:"addr"` Addr []string `yaml:"addr"`
Topic string `yaml:"topic"` Topic string `yaml:"topic"`

View File

@ -120,7 +120,7 @@ func init() {
}) })
//DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{ //DB.rdb = go_redis.NewClusterClient(&go_redis.ClusterOptions{
// Addrs: []string{config.Config.Redis.DBAddress}, // Addrs: []string{config.Config.Redis.DBAddress},
// PoolSize: 100, // PoolSize: 50,
//Password: config.Config.Redis.DBPassWord, //Password: config.Config.Redis.DBPassWord,
//}) //})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

View File

@ -96,16 +96,24 @@ func GetUserNameByUserID(userID string) (string, error) {
return user.Nickname, nil return user.Nickname, nil
} }
func UpdateUserInfo(user db.User, m ...map[string]interface{}) error { func UpdateUserInfo(user db.User) error {
dbConn, err := db.DB.MysqlDB.DefaultGormDB() dbConn, err := db.DB.MysqlDB.DefaultGormDB()
if err != nil { if err != nil {
return err return err
} }
dbConn.LogMode(false) dbConn.LogMode(false)
err = dbConn.Table("users").Where("user_id=?", user.UserID).Update(&user).Error err = dbConn.Table("users").Where("user_id=?", user.UserID).Update(&user).Error
if len(m) > 0 {
err = dbConn.Table("users").Where("user_id=?", user.UserID).Updates(m[0]).Error return err
} }
func UpdateUserInfoByMap(user db.User, m map[string]interface{}) error {
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
if err != nil {
return err
}
dbConn.LogMode(false)
err = dbConn.Table("users").Where("user_id=?", user.UserID).Updates(m).Error
return err return err
} }