Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao 2023-04-21 16:45:48 +08:00
commit d891a1ca8e
6 changed files with 50 additions and 62 deletions

View File

@ -2,6 +2,7 @@ package msggateway
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
@ -61,7 +62,7 @@ func (g GrpcHandler) GetSeq(context context.Context, data Req) ([]byte, error) {
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, err
}
if err := g.validate.Struct(req); err != nil {
if err := g.validate.Struct(&req); err != nil {
return nil, err
}
resp, err := g.notification.Msg.GetMaxAndMinSeq(context, &req)
@ -100,7 +101,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data Req) ([]byt
if err := proto.Unmarshal(data.Data, &signalReq); err != nil {
return nil, err
}
if err := g.validate.Struct(signalReq); err != nil {
if err := g.validate.Struct(&signalReq); err != nil {
return nil, err
}
//req := pbRtc.SignalMessageAssembleReq{SignalReq: &signalReq, OperationID: "111"}

View File

@ -145,10 +145,6 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil
}
//haveUserID, err := c.ConversationDatabase.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
//if err != nil {
// return nil, err
//}
filedMap := make(map[string]interface{})
switch req.FieldType {
case constant.FieldRecvMsgOpt:

View File

@ -34,7 +34,7 @@ type ConversationCache interface {
// get one conversation from cache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache
DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache
DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
// get one conversation from cache
GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
// get one user's all conversations from cache
@ -169,7 +169,7 @@ func (c *ConversationRedisCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx contex
})
}
func (c *ConversationRedisCache) DelUsersConversation(ownerUserIDs []string, conversationID string) ConversationCache {
func (c *ConversationRedisCache) DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache {
var keys []string
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID))

View File

@ -47,48 +47,49 @@ type ConversationDataBase struct {
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
return c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, conversation.ConversationID)
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID})
if err != nil {
return err
}
cache := c.cache.NewCache()
if len(haveUserIDs) > 0 {
err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
_, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
if err != nil {
return err
}
cache = cache.DelUsersConversation(conversation.ConversationID, haveUserIDs...)
}
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs)
log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
var cList []*relationTb.ConversationModel
var conversations []*relationTb.ConversationModel
for _, v := range NotUserIDs {
temp := new(relationTb.ConversationModel)
if err := utils.CopyStructFields(temp, conversation); err != nil {
return err
}
temp.OwnerUserID = v
cList = append(cList, temp)
conversations = append(conversations, temp)
}
cache := c.cache.NewCache()
if len(cList) > 0 {
err = conversationTx.Create(ctx, cList)
if len(conversations) > 0 {
err = conversationTx.Create(ctx, conversations)
if err != nil {
return err
}
cache = cache.DelConversationIDs(NotUserIDs)
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
}
// clear cache
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
return cache.DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
return cache.ExecDel(ctx)
})
}
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil {
return err
}
return c.cache.DelUsersConversation(userIDs, conversationID).ExecDel(ctx)
return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
@ -102,45 +103,32 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
userIDList := []string{conversation.OwnerUserID, conversation.UserID}
conversationTx := c.conversationDB.NewTx(tx)
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDList, conversation.ConversationID)
if err != nil {
return err
}
filedMap := map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}
if len(haveUserIDs) > 0 {
err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
cache := c.cache.NewCache()
for _, v := range [][3]string{{conversation.OwnerUserID, conversation.ConversationID, conversation.UserID}, {conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType), conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{v[1]})
if err != nil {
return err
}
}
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDList)
var cList []*relationTb.ConversationModel
for _, v := range NotUserIDs {
temp := new(relationTb.ConversationModel)
if v == conversation.UserID {
temp.OwnerUserID = conversation.UserID
temp.ConversationID = utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType)
temp.ConversationType = constant.SingleChatType
temp.UserID = conversation.OwnerUserID
temp.IsPrivateChat = conversation.IsPrivateChat
} else {
if err := utils.CopyStructFields(temp, conversation); err != nil {
if len(haveUserIDs) > 0 {
_, err := conversationTx.UpdateByMap(ctx, []string{v[0]}, v[1], map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
if err != nil {
return err
}
temp.OwnerUserID = v
}
cList = append(cList, temp)
}
if len(NotUserIDs) > 0 {
err = c.conversationDB.Create(ctx, cList)
if err != nil {
return err
cache = cache.DelUsersConversation(v[1], v[0])
} else {
newConversation := *conversation
newConversation.OwnerUserID = v[0]
newConversation.UserID = v[2]
newConversation.ConversationID = v[1]
newConversation.IsPrivateChat = conversation.IsPrivateChat
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]})
}
}
// clear cache
return c.cache.DelConversationIDs(NotUserIDs).DelUsersConversation(haveUserIDs, conversation.ConversationID).ExecDel(ctx)
return c.cache.ExecDel(ctx)
})
}
@ -168,9 +156,11 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
return err
}
if len(existConversations) > 0 {
err = conversationTx.Update(ctx, conversations)
if err != nil {
return err
for _, conversation := range conversations {
err = conversationTx.Update(ctx, conversation)
if err != nil {
return err
}
}
}
var existConversationIDs []string

View File

@ -29,12 +29,13 @@ func (c *ConversationGorm) Delete(ctx context.Context, groupIDs []string) (err e
return utils.Wrap(c.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.ConversationModel{}).Error, "")
}
func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (err error) {
return utils.Wrap(c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args).Error, "")
func (c *ConversationGorm) UpdateByMap(ctx context.Context, userIDList []string, conversationID string, args map[string]interface{}) (rows int64, err error) {
result := c.db(ctx).Where("owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Updates(args)
return result.RowsAffected, utils.Wrap(result.Error, "")
}
func (c *ConversationGorm) Update(ctx context.Context, conversations []*relation.ConversationModel) (err error) {
return utils.Wrap(c.db(ctx).Updates(&conversations).Error, "")
func (c *ConversationGorm) Update(ctx context.Context, conversation *relation.ConversationModel) (err error) {
return utils.Wrap(c.db(ctx).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).Updates(conversation).Error, "")
}
func (c *ConversationGorm) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*relation.ConversationModel, err error) {
@ -47,8 +48,8 @@ func (c *ConversationGorm) Take(ctx context.Context, userID, conversationID stri
return cc, utils.Wrap(c.db(ctx).Where("conversation_id = ? And owner_user_id = ?", conversationID, userID).Take(cc).Error, "")
}
func (c *ConversationGorm) FindUserID(ctx context.Context, userIDList []string, conversationID string) (existUserID []string, err error) {
return existUserID, utils.Wrap(c.db(ctx).Where(" owner_user_id IN (?) and conversation_id=?", userIDList, conversationID).Pluck("owner_user_id", &existUserID).Error, "")
func (c *ConversationGorm) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) (existUserID []string, err error) {
return existUserID, utils.Wrap(c.db(ctx).Where(" owner_user_id IN (?) and conversation_id in (?)", userIDs, conversationIDs).Pluck("owner_user_id", &existUserID).Error, "")
}
func (c *ConversationGorm) FindConversationID(ctx context.Context, userID string, conversationIDList []string) (existConversationID []string, err error) {

View File

@ -32,10 +32,10 @@ func (ConversationModel) TableName() string {
type ConversationModelInterface interface {
Create(ctx context.Context, conversations []*ConversationModel) (err error)
Delete(ctx context.Context, groupIDs []string) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) (err error)
Update(ctx context.Context, conversations []*ConversationModel) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) (rows int64, err error)
Update(ctx context.Context, conversation *ConversationModel) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*ConversationModel, err error)
FindUserID(ctx context.Context, userIDs []string, conversationID string) ([]string, error)
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)
FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error)
Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error)
FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error)