RevokeMsg

This commit is contained in:
withchao 2023-05-23 17:11:56 +08:00
parent 9e7bc1b769
commit 5061047164
9 changed files with 854 additions and 692 deletions

View File

@ -2,6 +2,7 @@ package msgtransfer
import ( import (
"context" "context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
@ -52,7 +53,8 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
} }
for _, v := range msgFromMQ.MsgData { for _, v := range msgFromMQ.MsgData {
if v.ContentType == constant.DeleteMessageNotification { switch v.ContentType {
case constant.DeleteMessageNotification:
deleteMessageTips := sdkws.DeleteMessageTips{} deleteMessageTips := sdkws.DeleteMessageTips{}
err := proto.Unmarshal(v.Content, &deleteMessageTips) err := proto.Unmarshal(v.Content, &deleteMessageTips)
if err != nil { if err != nil {
@ -61,10 +63,31 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
} }
if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil {
log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs)
continue
}
case constant.MsgRevokeNotification:
var elem sdkws.NotificationElem
if err := json.Unmarshal(v.Content, &elem); err != nil {
log.ZError(ctx, "json.Unmarshal NotificationElem", err, "content", string(v.Content))
continue
}
var tips sdkws.RevokeMsgTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "json.Unmarshal RevokeMsgTips", err, "content", string(v.Content))
continue
}
v.Seq = tips.Seq
data, err := proto.Marshal(v)
if err != nil {
log.ZError(ctx, "proto.Marshal MsgData", err)
continue
}
if err := mc.msgDatabase.RevokeMsg(ctx, tips.ConversationID, tips.Seq, data); err != nil {
log.ZError(ctx, "RevokeMsg", err, "conversationID", tips.ConversationID, "seq", tips.Seq)
continue
} }
} }
} }
} }
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

View File

@ -2,19 +2,77 @@ package msg
import ( import (
"context" "context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
) )
func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) { func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.RevokeMsgResp, error) {
//msgs := []*sdkws.MsgData{} if req.UserID == "" {
//if err := m.MsgDatabase.MsgToMongoMQ(ctx, "", req.ConversationID, msgs, 0); err != nil { return nil, errs.ErrArgs.Wrap("user_id is empty")
// return nil, err }
//} if req.RecvID == "" && req.GroupID == "" {
//msg := sdkws.MsgData{ return nil, errs.ErrArgs.Wrap("recv_id and group_id are empty")
// SendID: "", }
//} if req.RecvID != "" && req.GroupID != "" {
// return nil, errs.ErrArgs.Wrap("recv_id and group_id cannot exist at the same time")
//m.SendMsg(ctx, &msg.SendMsgReq{}) }
if err := tokenverify.CheckAccessV3(ctx, req.RecvID); err != nil {
return nil, err
}
var sessionType int32
var conversationID string
if req.GroupID == "" {
sessionType = constant.SingleChatType
conversationID = utils.GenConversationUniqueKeyForSingle(req.UserID, req.RecvID)
} else {
sessionType = constant.SuperGroupChatType
conversationID = utils.GenConversationUniqueKeyForGroup(req.GroupID)
}
tips := sdkws.RevokeMsgTips{
RevokerUserID: req.UserID,
ClientMsgID: "",
RevokeTime: utils.GetCurrentTimestampByMill(),
Seq: req.Seq,
SesstionType: sessionType,
ConversationID: conversationID,
}
detail, err := json.Marshal(&tips)
if err != nil {
return nil, err
}
notificationElem := sdkws.NotificationElem{Detail: string(detail)}
content, err := json.Marshal(&notificationElem)
if err != nil {
return nil, errs.Wrap(err)
}
msgData := sdkws.MsgData{
SendID: req.UserID,
RecvID: req.RecvID,
GroupID: req.GroupID,
Content: content,
MsgFrom: constant.SysMsgType,
ContentType: constant.MsgRevokeNotification,
SessionType: sessionType,
CreateTime: utils.GetCurrentTimestampByMill(),
ClientMsgID: utils.GetMsgID(req.UserID),
Options: config.GetOptionsByNotification(config.NotificationConf{
IsSendMsg: true,
ReliabilityLevel: 2,
}),
OfflinePushInfo: nil,
}
if msgData.SessionType == constant.SuperGroupChatType {
msgData.GroupID = msgData.RecvID
}
_, err = m.SendMsg(ctx, &msg.SendMsgReq{MsgData: &msgData})
if err != nil {
return nil, err
}
return &msg.RevokeMsgResp{}, nil return &msg.RevokeMsgResp{}, nil
} }

View File

@ -84,6 +84,8 @@ const (
ConversationPrivateChatNotification = 1701 ConversationPrivateChatNotification = 1701
ConversationUnreadNotification = 1702 ConversationUnreadNotification = 1702
MsgRevokeNotification = 1750
BusinessNotificationBegin = 2000 BusinessNotificationBegin = 2000
BusinessNotification = 2001 BusinessNotification = 2001
BusinessNotificationEnd = 2099 BusinessNotificationEnd = 2099

View File

@ -32,6 +32,8 @@ import (
type CommonMsgDatabase interface { type CommonMsgDatabase interface {
// 批量插入消息 // 批量插入消息
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
// 撤回消息
RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error
// 刪除redis中消息缓存 // 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
// incrSeq然后批量插入缓存 // incrSeq然后批量插入缓存
@ -222,6 +224,12 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
return nil return nil
} }
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, msg []byte) error {
index := seq / db.msg.GetSingleGocMsgNum()
docID := db.msg.IndexDocID(conversationID, index)
return db.msgDocDatabase.UpdateMsgContent(ctx, docID, seq%db.msg.GetSingleGocMsgNum(), msg)
}
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error { func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs) return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
} }

View File

@ -29,6 +29,7 @@ type MsgDocModelInterface interface {
PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error
Create(ctx context.Context, model *MsgDocModel) error Create(ctx context.Context, model *MsgDocModel) error
UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error UpdateMsg(ctx context.Context, docID string, index int64, info *MsgInfoModel) error
UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error
IsExistDocID(ctx context.Context, docID string) (bool, error) IsExistDocID(ctx context.Context, docID string) (bool, error)
UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error
FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error)

View File

@ -46,6 +46,14 @@ func (m *MsgMongoDriver) UpdateMsg(ctx context.Context, docID string, index int6
return nil return nil
} }
func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error {
_, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}})
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
msg.Status = status msg.Status = status
bytes, err := proto.Marshal(msg) bytes, err := proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -329,7 +329,6 @@ message GroupMemberInfoSetTips{
GroupMemberFullInfo changedUser = 4; GroupMemberFullInfo changedUser = 4;
} }
//////////////////////friend///////////////////// //////////////////////friend/////////////////////
message FriendApplication{ message FriendApplication{
@ -411,6 +410,10 @@ message ConversationHasReadTips {
int64 unreadCountTime = 4; int64 unreadCountTime = 4;
} }
message NotificationElem {
string detail = 1;
}
////////////////////message/////////////////////// ////////////////////message///////////////////////
message seqs { message seqs {
repeated int64 seqs = 1; repeated int64 seqs = 1;
@ -423,11 +426,11 @@ message DeleteMessageTips{
} }
message RevokeMsgTip{ message RevokeMsgTips{
string revokerUserID = 1; string revokerUserID = 1;
string clientMsgID = 2; string clientMsgID = 2;
int64 revokeTime = 3; int64 revokeTime = 3;
int64 sesstionType = 5; int32 sesstionType = 5;
int64 seq = 6; int64 seq = 6;
string conversationID = 7; string conversationID = 7;
} }

View File

@ -134,13 +134,9 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes
return resp, err return resp, err
} }
type NotificationElem struct {
Detail string `json:"detail,omitempty"`
}
func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...utils.OptionsOpt) error { func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...utils.OptionsOpt) error {
n := NotificationElem{Detail: utils.StructToJsonString(m)} n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)}
content, err := json.Marshal(n) content, err := json.Marshal(&n)
if err != nil { if err != nil {
log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", m) log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", m)
return err return err