From 333453df79c57d4b0da2baa4e057823dcd8b93ce Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 23 Dec 2021 17:19:57 +0800 Subject: [PATCH] send msg file modify --- internal/rpc/{chat => msg}/send_msg.go | 264 +++++++++++++++---------- 1 file changed, 156 insertions(+), 108 deletions(-) rename internal/rpc/{chat => msg}/send_msg.go (61%) diff --git a/internal/rpc/chat/send_msg.go b/internal/rpc/msg/send_msg.go similarity index 61% rename from internal/rpc/chat/send_msg.go rename to internal/rpc/msg/send_msg.go index 3403e84bc..de52d7022 100644 --- a/internal/rpc/chat/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -1,8 +1,6 @@ -package chat +package msg import ( - "Open_IM/internal/api/group" - "Open_IM/internal/push/content_struct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" @@ -13,9 +11,11 @@ import ( pbChat "Open_IM/pkg/proto/chat" pbGroup "Open_IM/pkg/proto/group" open_im_sdk "Open_IM/pkg/proto/sdk_ws" + sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" "encoding/json" + "github.com/garyburd/redigo/redis" "math/rand" "net/http" "strconv" @@ -45,46 +45,72 @@ type MsgCallBackResp struct { } } -func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { - replay := pbChat.UserSendMsgResp{} +func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { + msg.ServerMsgID = GetMsgID(msg.SendID) + if msg.SendTime == 0 { + msg.SendTime = utils.GetCurrentTimestampByNano() + } + switch msg.ContentType { + case constant.Text: + fallthrough + case constant.Picture: + fallthrough + case constant.Voice: + fallthrough + case constant.Video: + fallthrough + case constant.File: + fallthrough + case constant.AtText: + fallthrough + case constant.Merger: + fallthrough + case constant.Card: + fallthrough + case constant.Location: + fallthrough + case constant.Custom: + fallthrough + case constant.Quote: + utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, true) + utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, true) + utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, true) + case constant.Revoke: + utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) + case constant.HasReadReceipt: + utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) + case constant.Typing: + utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsSenderSync, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsConversationUpdate, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) + + } +} +func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { + replay := pbChat.SendMsgResp{} log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String()) //if !utils.VerifyToken(pb.Token, pb.SendID) { // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) - serverMsgID := GetMsgID(pb.SendID) - pbData := pbChat.WSToMsgSvrChatMsg{} - pbData.MsgFrom = pb.MsgFrom - pbData.SessionType = pb.SessionType - pbData.ContentType = pb.ContentType - pbData.Content = pb.Content - pbData.RecvID = pb.RecvID - pbData.ForceList = pb.ForceList - pbData.OfflineInfo = pb.OffLineInfo - pbData.Options = pb.Options - pbData.PlatformID = pb.PlatformID - pbData.ClientMsgID = pb.ClientMsgID - pbData.SendID = pb.SendID - pbData.SenderNickName = pb.SenderNickName - pbData.SenderFaceURL = pb.SenderFaceURL - pbData.MsgID = serverMsgID - pbData.OperationID = pb.OperationID - pbData.Token = pb.Token - if pb.SendTime == 0 { - pbData.SendTime = utils.GetCurrentTimestampByNano() - } else { - pbData.SendTime = pb.SendTime - } - options := utils.JsonStringToMap(pbData.Options) - isHistory := utils.GetSwitchFromOptions(options, "history") + rpc.encapsulateMsgData(pb.MsgData) + msgToMQ := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID} + //options := utils.JsonStringToMap(pbData.Options) + isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory) mReq := MsgCallBackReq{ - SendID: pb.SendID, - RecvID: pb.RecvID, - Content: pb.Content, - SendTime: pbData.SendTime, - MsgFrom: pbData.MsgFrom, - ContentType: pb.ContentType, - SessionType: pb.SessionType, - PlatformID: pb.PlatformID, - MsgID: pb.ClientMsgID, + SendID: pb.MsgData.SendID, + RecvID: pb.MsgData.RecvID, + Content: string(pb.MsgData.Content), + SendTime: pb.MsgData.SendTime, + MsgFrom: pb.MsgData.MsgFrom, + ContentType: pb.MsgData.ContentType, + SessionType: pb.MsgData.SessionType, + PlatformID: pb.MsgData.SenderPlatformID, + MsgID: pb.MsgData.ClientMsgID, } if !isHistory { mReq.IsOnlineOnly = true @@ -102,97 +128,65 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* if mResp.ErrCode != 0 { return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0) } else { - pbData.Content = mResp.ResponseResult.ModifiedMsg + pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg) } } } - switch pbData.SessionType { + switch pb.MsgData.SessionType { case constant.SingleChatType: - isSend := modifyMessageByUserMessageReceiveOpt(pbData.RecvID, pbData.SendID, constant.SingleChatType, &pbData) + isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { - err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + msgToMQ.MsgData = pb.MsgData + err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID) if err1 != nil { - log.NewError(pbData.OperationID, "kafka send msg err:RecvID", pbData.RecvID, pbData.String()) + log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) + err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID) if err2 != nil { - log.NewError(pbData.OperationID, "kafka send msg err:SendID", pbData.SendID, pbData.String()) + log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } - return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) case constant.GroupChatType: etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) client := pbGroup.NewGroupClient(etcdConn) req := &pbGroup.GetGroupAllMemberReq{ - GroupID: pbData.RecvID, - Token: pbData.Token, - OperationID: pbData.OperationID, + GroupID: pb.MsgData.GroupID, + Token: pb.Token, + OperationID: pb.OperationID, } reply, err := client.GetGroupAllMember(context.Background(), req) if err != nil { - log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error()) + log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error()) return returnMsg(&replay, pb, 201, err.Error(), "", 0) } - if reply.ErrorCode != 0 { - log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg) - return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0) + if reply.ErrCode != 0 { + log.Error(pb.Token, pb.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrMsg) + return returnMsg(&replay, pb, reply.ErrCode, reply.ErrMsg, "", 0) } - var addUidList []string - switch pbData.ContentType { - case constant.KickGroupMemberTip: - var notification content_struct.NotificationContent - var kickContent group.KickGroupMemberReq - err := utils.JsonStringToStruct(pbData.Content, ¬ification) - if err != nil { - log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) - return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } else { - err := utils.JsonStringToStruct(notification.Detail, &kickContent) - if err != nil { - log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) - return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } - for _, v := range kickContent.UidListInfo { - addUidList = append(addUidList, v.UserId) - } - } - case constant.QuitGroupTip: - addUidList = append(addUidList, pbData.SendID) - default: - } - groupID := pbData.RecvID - for i, v := range reply.MemberList { - pbData.RecvID = v.UserId + " " + groupID - isSend := modifyMessageByUserMessageReceiveOpt(v.UserId, groupID, constant.GroupChatType, &pbData) + groupID := pb.MsgData.GroupID + for _, v := range reply.MemberList { + pb.MsgData.RecvID = v.UserId + isSend := modifyMessageByUserMessageReceiveOpt(v.UserId, groupID, constant.GroupChatType, pb) if isSend { - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + msgToMQ.MsgData = pb.MsgData + err := rpc.sendMsgToKafka(&msgToMQ, v.UserId) if err != nil { - log.NewError(pbData.OperationID, "kafka send msg err:UserId", v.UserId, pbData.String()) + log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v.UserId, msgToMQ.String()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } } - for i, v := range addUidList { - pbData.RecvID = v + " " + groupID - isSend := modifyMessageByUserMessageReceiveOpt(v, groupID, constant.GroupChatType, &pbData) - if isSend { - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) - if err != nil { - log.NewError(pbData.OperationID, "kafka send msg err:UserId", v, pbData.String()) - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - } - } - return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) + return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) default: return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) } } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error { +func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error { pid, offset, err := rpc.producer.SendMessage(m, key) if err != nil { log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) @@ -204,21 +198,20 @@ func GetMsgID(sendID string) string { return t + "-" + sendID + "-" + strconv.Itoa(rand.Int()) } -func returnMsg(replay *pbChat.UserSendMsgResp, pb *pbChat.UserSendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.UserSendMsgResp, error) { +func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) { replay.ErrCode = errCode replay.ErrMsg = errMsg - replay.ReqIdentifier = pb.ReqIdentifier - replay.ClientMsgID = pb.ClientMsgID replay.ServerMsgID = serverMsgID + replay.ClientMsgID = pb.MsgData.ClientMsgID replay.SendTime = sendTime return replay, nil } -func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, msg *pbChat.WSToMsgSvrChatMsg) bool { +func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType int, pb *pbChat.SendMsgReq) bool { conversationID := utils.GetConversationIDBySessionType(sourceID, sessionType) opt, err := db.DB.GetSingleConversationMsgOpt(userID, conversationID) - if err != nil { - log.NewError(msg.OperationID, "GetSingleConversationMsgOpt from redis err", msg.String()) + if err != nil || err != redis.ErrNil { + log.NewError(pb.OperationID, "GetSingleConversationMsgOpt from redis err", pb.String()) return true } switch opt { @@ -227,12 +220,10 @@ func modifyMessageByUserMessageReceiveOpt(userID, sourceID string, sessionType i case constant.NotReceiveMessage: return false case constant.ReceiveNotNotifyMessage: - options := utils.JsonStringToMap(msg.Options) - if options == nil { - options = make(map[string]int32, 2) + if pb.MsgData.Options == nil { + pb.MsgData.Options = make(map[string]bool, 10) } - utils.SetSwitchFromOptions(options, "offlinePush", 0) - msg.Options = utils.MapIntToJsonString(options) + utils.SetSwitchFromOptions(pb.MsgData.Options, constant.IsOfflinePush, false) return true } @@ -250,7 +241,64 @@ type NotificationMsg struct { } func Notification(n *NotificationMsg, onlineUserOnly bool) { - + var req pbChat.SendMsgReq + var msg sdk_ws.MsgData + var offlineInfo sdk_ws.OfflinePushInfo + var title, desc, ext string + var pushSwitch bool + req.OperationID = n.OperationID + msg.SendID = n.SendID + msg.RecvID = n.RecvID + msg.Content = n.Content + msg.MsgFrom = n.MsgFrom + msg.ContentType = n.ContentType + msg.SessionType = n.SessionType + msg.CreateTime = utils.GetCurrentTimestampByNano() + msg.ClientMsgID = utils.GetMsgID(n.SendID) + switch n.SessionType { + case constant.GroupChatType: + msg.RecvID = "" + msg.GroupID = n.RecvID + } + if onlineUserOnly { + msg.Options = make(map[string]bool, 10) + utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsHistory, false) + utils.SetSwitchFromOptions(msg.Options, constant.IsPersistent, false) + } + offlineInfo.IOSBadgeCount = config.Config.IOSPush.BadgeCount + offlineInfo.IOSPushSound = config.Config.IOSPush.PushSound + switch msg.ContentType { + case constant.CreateGroupTip: + pushSwitch = config.Config.Notification.GroupCreated.OfflinePush.PushSwitch + title = config.Config.Notification.GroupCreated.OfflinePush.Title + desc = config.Config.Notification.GroupCreated.OfflinePush.Desc + ext = config.Config.Notification.GroupCreated.OfflinePush.Ext + case constant.ChangeGroupInfoTip: + pushSwitch = config.Config.Notification.GroupInfoChanged.OfflinePush.PushSwitch + title = config.Config.Notification.GroupInfoChanged.OfflinePush.Title + desc = config.Config.Notification.GroupInfoChanged.OfflinePush.Desc + ext = config.Config.Notification.GroupInfoChanged.OfflinePush.Ext + case constant.ApplyJoinGroupTip: + pushSwitch = config.Config.Notification.ApplyJoinGroup.OfflinePush.PushSwitch + title = config.Config.Notification.ApplyJoinGroup.OfflinePush.Title + desc = config.Config.Notification.ApplyJoinGroup.OfflinePush.Desc + ext = config.Config.Notification.ApplyJoinGroup.OfflinePush.Ext + } + utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, pushSwitch) + offlineInfo.Title = title + offlineInfo.Desc = desc + offlineInfo.Ext = ext + msg.OfflinePushInfo = &offlineInfo + req.MsgData = &msg + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + client := pbChat.NewChatClient(etcdConn) + reply, err := client.SendMsg(context.Background(), &req) + if err != nil { + log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), err.Error()) + } else if reply.ErrCode != 0 { + log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String()) + } } //message GroupCreatedTips{