diff --git a/config/config.yaml b/config/config.yaml index a0e66fae2..736b15bce 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -41,11 +41,15 @@ kafka: ws2mschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ws2ms_chat" + ws2mschatoffline: + addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 + topic: "ws2ms_chat_offline" ms2pschat: addr: [ 127.0.0.1:9092 ] #kafka配置,默认即可 topic: "ms2ps_chat" consumergroupid: msgToMongo: mongo + msgToMongoOffline: mongo_offline msgToMySql: mysql msgToPush: push diff --git a/internal/api/group/group.go b/internal/api/group/group.go index 598f3f473..08ee1f274 100644 --- a/internal/api/group/group.go +++ b/internal/api/group/group.go @@ -287,6 +287,7 @@ func CreateGroup(c *gin.Context) { req.OwnerUserID = params.OwnerUserID req.OperationID = params.OperationID + log.NewInfo(req.OperationID, "CreateGroup args ", req.String()) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index a67b9cff4..c382de7f0 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -2,20 +2,40 @@ package logic import ( "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" + "Open_IM/pkg/statistics" "fmt" + "sync" ) +const OnlineTopicBusy = 1 +const OnlineTopicVacancy = 0 + var ( - persistentCH PersistentConsumerHandler - historyCH HistoryConsumerHandler - producer *kafka.Producer + persistentCH PersistentConsumerHandler + historyCH OnlineHistoryConsumerHandler + offlineHistoryCH OfflineHistoryConsumerHandler + producer *kafka.Producer + cmdCh chan Cmd2Value + onlineTopicStatus int + w *sync.Mutex + singleMsgSuccessCount uint64 + groupMsgCount uint64 + singleMsgFailedCount uint64 ) func Init() { - + cmdCh = make(chan Cmd2Value, 10000) + w = new(sync.Mutex) persistentCH.Init() - historyCH.Init() + historyCH.Init(cmdCh) + onlineTopicStatus = OnlineTopicVacancy + log.Debug("come msg transfer ts", config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline, config.Config.Kafka.Ws2mschatOffline.Topic) + offlineHistoryCH.Init(cmdCh) + statistics.NewStatistics(&singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + statistics.NewStatistics(&groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) } func Run() { @@ -26,4 +46,15 @@ func Run() { fmt.Println("not start mysql consumer") } go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) + go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH) +} +func SetOnlineTopicStatus(status int) { + w.Lock() + defer w.Unlock() + onlineTopicStatus = status +} +func GetOnlineTopicStatus() int { + w.Lock() + defer w.Unlock() + return onlineTopicStatus } diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go new file mode 100644 index 000000000..9355385d0 --- /dev/null +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -0,0 +1,124 @@ +package logic + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + kfk "Open_IM/pkg/common/kafka" + "Open_IM/pkg/common/log" + pbMsg "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" + "time" +) + +type OfflineHistoryConsumerHandler struct { + msgHandle map[string]fcb + cmdCh chan Cmd2Value + historyConsumerGroup *kfk.MConsumerGroup +} + +func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { + mc.msgHandle = make(map[string]fcb) + mc.cmdCh = cmdCh + mc.msgHandle[config.Config.Kafka.Ws2mschatOffline.Topic] = mc.handleChatWs2Mongo + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschatOffline.Topic}, + config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongoOffline) + +} + +func (mc *OfflineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { + now := time.Now() + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg, &msgFromMQ) + if err != nil { + log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) + return + } + operationID := msgFromMQ.OperationID + log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) + //Control whether to store offline messages (mongo) + isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) + //Control whether to store history messages (mysql) + isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) + isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) + switch msgFromMQ.MsgData.SessionType { + case constant.SingleChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgKey, &msgFromMQ) + if err != nil { + singleMsgFailedCount++ + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + return + } + singleMsgSuccessCount++ + log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + case constant.GroupChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) + if err != nil { + log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) + return + } + groupMsgCount++ + } + go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) + case constant.NotificationChatType: + log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) + if isHistory { + err := saveUserChat(msgKey, &msgFromMQ) + if err != nil { + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) + return + } + log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) + } + if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { + } else { + go sendMessageToPush(&msgFromMQ, msgKey) + } + log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) + default: + log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) + return + } + log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) +} + +func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + //log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + //for msg := range claim.Messages() { + // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") + // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + //} + for msg := range claim.Messages() { + if GetOnlineTopicStatus() == OnlineTopicVacancy { + log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } else { + select { + case <-mc.cmdCh: + log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + case <-time.After(time.Millisecond * time.Duration(100)): + log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + } + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } + } + + return nil +} diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go similarity index 67% rename from internal/msg_transfer/logic/history_msg_handler.go rename to internal/msg_transfer/logic/online_history_msg_handler.go index 27d84d673..9bf13e57a 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -8,10 +8,9 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" - "Open_IM/pkg/statistics" "Open_IM/pkg/utils" "context" - "fmt" + "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "strings" @@ -19,28 +18,52 @@ import ( ) type fcb func(msg []byte, msgKey string) - -type HistoryConsumerHandler struct { - msgHandle map[string]fcb - historyConsumerGroup *kfk.MConsumerGroup - singleMsgFailedCount uint64 - singleMsgSuccessCount uint64 - groupMsgCount uint64 +type Cmd2Value struct { + Cmd int + Value interface{} +} +type OnlineHistoryConsumerHandler struct { + msgHandle map[string]fcb + historyConsumerGroup *kfk.MConsumerGroup + cmdCh chan Cmd2Value } -func (mc *HistoryConsumerHandler) Init() { - statistics.NewStatistics(&mc.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) - statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second groupMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) - - mc.msgHandle = make(map[string]fcb) - mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, +func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { + och.msgHandle = make(map[string]fcb) + och.cmdCh = cmdCh + och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo + och.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) } - -func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { +func (och *OnlineHistoryConsumerHandler) TriggerCmd(status int) { + operationID := utils.OperationIDGenerator() + for { + err := sendCmd(och.cmdCh, Cmd2Value{Cmd: status, Value: ""}, 1) + if err != nil { + log.Error(operationID, "TriggerCmd failed ", err.Error(), status) + continue + } + log.Debug(operationID, "TriggerCmd success", status) + return + } +} +func sendCmd(ch chan Cmd2Value, value Cmd2Value, timeout int64) error { + var flag = 0 + select { + case ch <- value: + flag = 1 + case <-time.After(time.Second * time.Duration(timeout)): + flag = 2 + } + if flag == 1 { + return nil + } else { + return errors.New("send cmd timeout") + } +} +func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { now := time.Now() msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(msg, &msgFromMQ) @@ -61,11 +84,11 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) if isHistory { err := saveUserChat(msgKey, &msgFromMQ) if err != nil { - mc.singleMsgFailedCount++ + singleMsgFailedCount++ log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } - mc.singleMsgSuccessCount++ + singleMsgSuccessCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { @@ -81,7 +104,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) return } - mc.groupMsgCount++ + groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) case constant.NotificationChatType: @@ -106,14 +129,22 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) } -func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (HistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *HistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim) error { +func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + SetOnlineTopicStatus(OnlineTopicBusy) + //och.TriggerCmd(OnlineTopicBusy) + log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) + och.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "") + if claim.HighWaterMarkOffset()-msg.Offset <= 1 { + log.Debug("", "online msg consume end", claim.HighWaterMarkOffset(), msg.Offset) + SetOnlineTopicStatus(OnlineTopicVacancy) + och.TriggerCmd(OnlineTopicVacancy) + } } return nil } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index ccc8acf96..22fcc61af 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -97,8 +97,11 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeCreateGroup failed") } } - //Time stamp + MD5 to generate group chat id - groupId := utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10)) + + groupId := req.GroupInfo.GroupID + if groupId == "" { + groupId = utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10)) + } //to group groupInfo := db.Group{} utils.CopyStructFields(&groupInfo, req.GroupInfo) @@ -109,15 +112,19 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR log.NewError(req.OperationID, "InsertIntoGroup failed, ", err.Error(), groupInfo) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB) } - - us, err := imdb.GetUserByUserID(req.OwnerUserID) + groupMember := db.GroupMember{} + us := &db.User{} + if req.OwnerUserID == "" { + goto initMemberList + } + us, err = imdb.GetUserByUserID(req.OwnerUserID) if err != nil { log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OwnerUserID) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB) } //to group member - groupMember := db.GroupMember{GroupID: groupId, RoleLevel: constant.GroupOwner, OperatorUserID: req.OpUserID} + groupMember = db.GroupMember{GroupID: groupId, RoleLevel: constant.GroupOwner, OperatorUserID: req.OpUserID} utils.CopyStructFields(&groupMember, us) err = imdb.InsertIntoGroupMember(groupMember) if err != nil { @@ -125,10 +132,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB) } - err = db.DB.AddGroupMember(groupId, req.OwnerUserID) - if err != nil { - log.NewError(req.OperationID, "AddGroupMember failed ", err.Error(), groupId, req.OwnerUserID) - } +initMemberList: var okUserIDList []string //to group member for _, user := range req.InitMemberList { @@ -148,20 +152,14 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR log.NewError(req.OperationID, "InsertIntoGroupMember failed ", err.Error(), groupMember) continue } - okUserIDList = append(okUserIDList, user.UserID) - // mongoDB method - //err = db.DB.AddGroupMember(groupId, user.UserID) - //if err != nil { - // log.NewError(req.OperationID, "add mongo group member failed, db.DB.AddGroupMember failed ", err.Error()) - //} } resp := &pbGroup.CreateGroupResp{GroupInfo: &open_im_sdk.GroupInfo{}} group, err := imdb.GetGroupInfoByGroupID(groupId) if err != nil { log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", err.Error(), groupId) resp.ErrCode = constant.ErrDB.ErrCode - resp.ErrMsg = constant.ErrDB.ErrMsg + resp.ErrMsg = err.Error() return resp, nil } utils.CopyStructFields(resp.GroupInfo, group) @@ -169,32 +167,39 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR if err != nil { log.NewError(req.OperationID, "GetGroupMemberNumByGroupID failed ", err.Error(), groupId) resp.ErrCode = constant.ErrDB.ErrCode - resp.ErrMsg = constant.ErrDB.ErrMsg + resp.ErrMsg = err.Error() return resp, nil } - resp.GroupInfo.OwnerUserID = req.OwnerUserID - - okUserIDList = append(okUserIDList, req.OwnerUserID) - addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{ - UserIDList: okUserIDList, - GroupID: groupId, - OperationID: req.OperationID, - } - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) - cacheClient := pbCache.NewCacheClient(etcdConn) - cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq) - if err != nil { - log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error()) - return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil - } - if cacheResp.CommonResp.ErrCode != 0 { - log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String()) - return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil + if req.OwnerUserID != "" { + resp.GroupInfo.OwnerUserID = req.OwnerUserID + okUserIDList = append(okUserIDList, req.OwnerUserID) } - log.NewInfo(req.OperationID, "rpc CreateGroup return ", resp.String()) - chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, okUserIDList) - return resp, nil + if len(okUserIDList) != 0 { + addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{ + UserIDList: okUserIDList, + GroupID: groupId, + OperationID: req.OperationID, + } + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) + cacheClient := pbCache.NewCacheClient(etcdConn) + cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq) + if err != nil { + log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error()) + return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil + } + if cacheResp.CommonResp.ErrCode != 0 { + log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String()) + return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil + } + + log.NewInfo(req.OperationID, "rpc CreateGroup return ", resp.String()) + chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, okUserIDList) + return resp, nil + } else { + log.NewInfo(req.OperationID, "rpc CreateGroup return ", resp.String()) + return resp, nil + } } func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJoinedGroupListReq) (*pbGroup.GetJoinedGroupListResp, error) { diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index 8bed05810..002918bae 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -19,7 +19,8 @@ type rpcChat struct { rpcRegisterName string etcdSchema string etcdAddr []string - producer *kafka.Producer + onlineProducer *kafka.Producer + offlineProducer *kafka.Producer } func NewRpcChatServer(port int) *rpcChat { @@ -30,7 +31,8 @@ func NewRpcChatServer(port int) *rpcChat { etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } - rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + rc.onlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic) return &rc } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 78588cdf1..0cb78ca0a 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -10,9 +10,11 @@ import ( pbCache "Open_IM/pkg/proto/cache" pbChat "Open_IM/pkg/proto/chat" pbConversation "Open_IM/pkg/proto/conversation" + pbRelay "Open_IM/pkg/proto/relay" sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" + "errors" "github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "math/rand" @@ -188,14 +190,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if isSend { msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -252,72 +254,33 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S default: } + onUserIDList, offUserIDList := getOnlineAndOfflineUserIDList(memberUserIDList, pb.OperationID) + log.Debug(pb.OperationID, onUserIDList, offUserIDList) groupID := pb.MsgData.GroupID //split parallel send var wg sync.WaitGroup var sendTag bool var split = 50 - remain := len(memberUserIDList) % split - for i := 0; i < len(memberUserIDList)/split; i++ { + remain := len(onUserIDList) % split + for i := 0; i < len(onUserIDList)/split; i++ { wg.Add(1) - go func(list []string) { - // log.Debug(pb.OperationID, "split userID ", list) - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} - *groupPB.MsgData = *pb.MsgData - if pb.MsgData.OfflinePushInfo != nil { - *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} - for _, v := range list { - groupPB.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToKafka(&msgToMQGroup, v) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) - } - } - wg.Done() - }(memberUserIDList[i*split : (i+1)*split]) + go rpc.sendMsgToGroup(onUserIDList[i*split:(i+1)*split], pb, constant.OnlineStatus, &sendTag, &wg) } if remain > 0 { wg.Add(1) - go func(list []string) { - // log.Debug(pb.OperationID, "split userID ", list) - groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} - *groupPB.MsgData = *pb.MsgData - if pb.MsgData.OfflinePushInfo != nil { - *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} - for _, v := range list { - groupPB.MsgData.RecvID = v - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &groupPB) - if isSend { - msgToMQGroup.MsgData = groupPB.MsgData - // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) - err := rpc.sendMsgToKafka(&msgToMQGroup, v) - if err != nil { - log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) - } else { - sendTag = true - } - } else { - log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) - } - } - wg.Done() - }(memberUserIDList[split*(len(memberUserIDList)/split):]) + go rpc.sendMsgToGroup(onUserIDList[split*(len(onUserIDList)/split):], pb, constant.OnlineStatus, &sendTag, &wg) + } + wg.Wait() + remain = len(offUserIDList) % split + for i := 0; i < len(offUserIDList)/split; i++ { + wg.Add(1) + go rpc.sendMsgToGroup(offUserIDList[i*split:(i+1)*split], pb, constant.OfflineStatus, &sendTag, &wg) + } + if remain > 0 { + wg.Add(1) + go rpc.sendMsgToGroup(offUserIDList[split*(len(offUserIDList)/split):], pb, constant.OfflineStatus, &sendTag, &wg) } wg.Wait() - log.Info(msgToMQSingle.OperationID, "addUidList", addUidList) for _, v := range addUidList { pb.MsgData.RecvID = v @@ -325,7 +288,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Info(msgToMQSingle.OperationID, "isSend", isSend) if isSend { msgToMQSingle.MsgData = pb.MsgData - err := rpc.sendMsgToKafka(&msgToMQSingle, v) + err := rpc.sendMsgToKafka(&msgToMQSingle, v, constant.OnlineStatus) if err != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:UserId", v, msgToMQSingle.String()) } else { @@ -397,14 +360,14 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S case constant.NotificationChatType: msgToMQSingle.MsgData = pb.MsgData log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) - err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID) + err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID) + err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus) if err2 != nil { log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -416,12 +379,22 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error { - pid, offset, err := rpc.producer.SendMessage(m, key) - if err != nil { - log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) +func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { + switch status { + case constant.OnlineStatus: + pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + if err != nil { + log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) + } + return err + case constant.OfflineStatus: + pid, offset, err := rpc.offlineProducer.SendMessage(m, key) + if err != nil { + log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) + } + return err } - return err + return errors.New("status error") } func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") @@ -736,3 +709,68 @@ func Notification(n *NotificationMsg) { log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg) } } +func getOnlineAndOfflineUserIDList(memberList []string, operationID string) (onllUserIDList []string, offlUserIDList []string) { + var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult + req := &pbRelay.GetUsersOnlineStatusReq{} + req.UserIDList = memberList + req.OperationID = operationID + req.OpUserID = config.Config.Manager.AppManagerUid[0] + flag := false + grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + for _, v := range grpcCons { + client := pbRelay.NewOnlineMessageRelayServiceClient(v) + reply, err := client.GetUsersOnlineStatus(context.Background(), req) + if err != nil { + log.NewError(operationID, "GetUsersOnlineStatus rpc err", req.String(), err.Error()) + continue + } else { + if reply.ErrCode == 0 { + wsResult = append(wsResult, reply.SuccessResult...) + } + } + } + log.NewInfo(operationID, "call GetUsersOnlineStatus rpc server is success", wsResult) + //Online data merge of each node + for _, v1 := range memberList { + flag = false + + for _, v2 := range wsResult { + if v2.UserID == v1 { + flag = true + onllUserIDList = append(onllUserIDList, v1) + } + + } + if !flag { + offlUserIDList = append(offlUserIDList, v1) + } + } + return onllUserIDList, offlUserIDList +} + +func (rpc *rpcChat) sendMsgToGroup(list []string, pb *pbChat.SendMsgReq, status string, sendTag *bool, wg *sync.WaitGroup) { + // log.Debug(pb.OperationID, "split userID ", list) + groupPB := pbChat.SendMsgReq{Token: pb.Token, OperationID: pb.OperationID, MsgData: &sdk_ws.MsgData{OfflinePushInfo: &sdk_ws.OfflinePushInfo{}}} + *groupPB.MsgData = *pb.MsgData + if pb.MsgData.OfflinePushInfo != nil { + *groupPB.MsgData.OfflinePushInfo = *pb.MsgData.OfflinePushInfo + } + msgToMQGroup := pbChat.MsgDataToMQ{Token: groupPB.Token, OperationID: groupPB.OperationID, MsgData: groupPB.MsgData} + for _, v := range list { + groupPB.MsgData.RecvID = v + isSend := modifyMessageByUserMessageReceiveOpt(v, groupPB.MsgData.GroupID, constant.GroupChatType, &groupPB) + if isSend { + msgToMQGroup.MsgData = groupPB.MsgData + // log.Debug(groupPB.OperationID, "sendMsgToKafka, ", v, groupID, msgToMQGroup.String()) + err := rpc.sendMsgToKafka(&msgToMQGroup, v, status) + if err != nil { + log.NewError(msgToMQGroup.OperationID, "kafka send msg err:UserId", v, msgToMQGroup.String()) + } else { + *sendTag = true + } + } else { + log.Debug(groupPB.OperationID, "not sendMsgToKafka, ", v) + } + } + wg.Done() +} diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 0f628b77d..90f983a05 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -405,14 +405,28 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI chat.UserInfoUpdatedNotification(req.OperationID, req.UserInfo.UserID, v.FriendUser.UserID) } - //etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImFriendName) - //client := pbFriend.NewFriendClient(etcdConn) - chat.UserInfoUpdatedNotification(req.OperationID, req.UserInfo.UserID, req.OpUserID) log.Info(req.OperationID, "UserInfoUpdatedNotification ", req.UserInfo.UserID, req.OpUserID) + if req.UserInfo.FaceURL != "" { + go s.SyncJoinedGroupMemberFaceURL(req.UserInfo.UserID, req.UserInfo.FaceURL, req.OperationID, req.OpUserID) + } + return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{}}, nil } +func (s *userServer) SyncJoinedGroupMemberFaceURL(userID string, faceURL string, operationID string, opUserID string) { + joinedGroupIDList, err := imdb.GetJoinedGroupIDListByUserID(userID) + if err != nil { + log.NewWarn(operationID, "GetJoinedGroupIDListByUserID failed ", userID, err.Error()) + return + } + for _, v := range joinedGroupIDList { + groupMemberInfo := db.GroupMember{UserID: userID, GroupID: v, FaceURL: faceURL} + imdb.UpdateGroupMemberInfo(groupMemberInfo) + chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID) + } +} + func (s *userServer) GetUsersByName(ctx context.Context, req *pbUser.GetUsersByNameReq) (*pbUser.GetUsersByNameResp, error) { log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req.String()) resp := &pbUser.GetUsersByNameResp{} diff --git a/pkg/base_info/group_api_struct.go b/pkg/base_info/group_api_struct.go index 94a152818..169cb0b6e 100644 --- a/pkg/base_info/group_api_struct.go +++ b/pkg/base_info/group_api_struct.go @@ -81,8 +81,8 @@ type GetGroupAllMemberResp struct { } type CreateGroupReq struct { - MemberList []*GroupAddMemberInfo `json:"memberList" binding:"required"` - OwnerUserID string `json:"ownerUserID" binding:"required"` + MemberList []*GroupAddMemberInfo `json:"memberList"` + OwnerUserID string `json:"ownerUserID"` GroupType int32 `json:"groupType"` GroupName string `json:"groupName"` Notification string `json:"notification"` @@ -90,6 +90,7 @@ type CreateGroupReq struct { FaceURL string `json:"faceURL"` Ex string `json:"ex"` OperationID string `json:"operationID" binding:"required"` + GroupID string `json:"groupID"` } type CreateGroupResp struct { CommResp diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index af51cda21..e898e5aa6 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -192,14 +192,19 @@ type config struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } + Ws2mschatOffline struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } Ms2pschat struct { Addr []string `yaml:"addr"` Topic string `yaml:"topic"` } ConsumerGroupID struct { - MsgToMongo string `yaml:"msgToMongo"` - MsgToMySql string `yaml:"msgToMySql"` - MsgToPush string `yaml:"msgToPush"` + MsgToMongo string `yaml:"msgToMongo"` + MsgToMongoOffline string `yaml:"msgToMongoOffline"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` } } Secret string `yaml:"secret"`