conversation update

This commit is contained in:
Gordon 2023-02-03 16:22:59 +08:00
parent 3b4c227519
commit 830ebdd472

View File

@ -1,17 +1,22 @@
package conversation package conversation
import ( import (
"Open_IM/internal/common/check"
chat "Open_IM/internal/rpc/msg" chat "Open_IM/internal/rpc/msg"
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db" "Open_IM/pkg/common/db/cache"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/db/controller"
rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/db/relation"
"Open_IM/pkg/common/db/table"
"Open_IM/pkg/common/db/unrelation"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prometheus" promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/getcdv3" "Open_IM/pkg/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation" pbConversation "Open_IM/pkg/proto/conversation"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"github.com/dtm-labs/rockscache"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -23,156 +28,55 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
type rpcConversation struct { type conversationServer struct {
rpcPort int rpcPort int
rpcRegisterName string rpcRegisterName string
etcdSchema string etcdSchema string
etcdAddr []string etcdAddr []string
groupChecker *check.GroupChecker
controller.ConversationInterface
} }
func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { func NewConversationServer(port int) *conversationServer {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbConversation.ModifyConversationFieldResp{}
var err error
isSyncConversation := true
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := imdb.GetGroupInfoByGroupID(req.Conversation.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.Conversation.GroupID, err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
if groupInfo.Status == constant.GroupStatusDismissed && !req.Conversation.IsNotInGroup && req.FieldType != constant.FieldUnread {
errMsg := "group status is dismissed"
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}
return resp, nil
}
}
var conversation imdb.Conversation
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CopyStructFields failed", *req.Conversation, err.Error())
}
haveUserID, _ := imdb.GetExistConversationUserIDList(req.UserIDList, req.Conversation.ConversationID)
switch req.FieldType {
case constant.FieldRecvMsgOpt:
for _, v := range req.UserIDList {
if err = db.DB.SetSingleConversationRecvMsgOpt(v, req.Conversation.ConversationID, req.Conversation.RecvMsgOpt); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt})
case constant.FieldGroupAtType:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"group_at_type": conversation.GroupAtType})
case constant.FieldIsNotInGroup:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_not_in_group": conversation.IsNotInGroup})
case constant.FieldIsPinned:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_pinned": conversation.IsPinned})
case constant.FieldIsPrivateChat:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
case constant.FieldEx:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"ex": conversation.Ex})
case constant.FieldAttachedInfo:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"attached_info": conversation.AttachedInfo})
case constant.FieldUnread:
isSyncConversation = false
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"update_unread_count_time": conversation.UpdateUnreadCountTime})
case constant.FieldBurnDuration:
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"burn_duration": conversation.BurnDuration})
}
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "UpdateColumnsConversations error", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
conversation.OwnerUserID = v
err = rocksCache.DelUserConversationIDListFromCache(v)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
err := imdb.SetOneConversation(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
// notification
if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat {
//sync peer user conversation if conversation is singleChatType
if err := syncPeerUserConversation(req.Conversation, req.OperationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "syncPeerUserConversation", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
} else {
if isSyncConversation {
for _, v := range req.UserIDList {
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
chat.ConversationChangeNotification(req.OperationID, v)
}
} else {
for _, v := range req.UserIDList {
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, conversation.UpdateUnreadCountTime)
}
}
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return", resp.String())
resp.CommonResp = &pbConversation.CommonResp{}
return resp, nil
}
func syncPeerUserConversation(conversation *pbConversation.Conversation, operationID string) error {
peerUserConversation := imdb.Conversation{
OwnerUserID: conversation.UserID,
ConversationID: utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType),
ConversationType: constant.SingleChatType,
UserID: conversation.OwnerUserID,
GroupID: "",
RecvMsgOpt: 0,
UnreadCount: 0,
DraftTextTime: 0,
IsPinned: false,
IsPrivateChat: conversation.IsPrivateChat,
AttachedInfo: "",
Ex: "",
}
err := imdb.PeerUserSetConversation(peerUserConversation)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
return err
}
err = rocksCache.DelConversationFromCache(conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType))
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID)
}
err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID)
}
chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat)
return nil
}
func NewRpcConversationServer(port int) *rpcConversation {
log.NewPrivateLog(constant.LogFileName) log.NewPrivateLog(constant.LogFileName)
return &rpcConversation{ c := conversationServer{
rpcPort: port, rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName, rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName,
etcdSchema: config.Config.Etcd.EtcdSchema, etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr, etcdAddr: config.Config.Etcd.EtcdAddr,
groupChecker: check.NewGroupChecker(),
} }
var cDB relation.Conversation
var cCache cache.ConversationCache
//mysql init
var mysql relation.Mysql
err := mysql.InitConn().AutoMigrateModel(&table.ConversationModel{})
if err != nil {
panic("db init err:" + err.Error())
}
if mysql.GormConn() != nil {
//get gorm model
cDB = relation.NewConversationGorm(mysql.GormConn())
} else {
panic("db init err:" + "conn is nil")
}
//redis init
var redis cache.RedisClient
redis.InitRedis()
rcClient := rockscache.NewClient(redis.GetClient(), rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
})
cCache = cache.NewConversationRedis(rcClient)
database := controller.NewConversationDataBase(cDB, cCache)
c.ConversationInterface = controller.NewConversationController(database)
return &c
} }
func (rpc *rpcConversation) Run() { func (c *conversationServer) Run() {
log.NewInfo("0", "rpc conversation start...") log.NewInfo("0", "rpc conversation start...")
listenIP := "" listenIP := ""
@ -181,11 +85,11 @@ func (rpc *rpcConversation) Run() {
} else { } else {
listenIP = config.Config.ListenIP listenIP = config.Config.ListenIP
} }
address := listenIP + ":" + strconv.Itoa(rpc.rpcPort) address := listenIP + ":" + strconv.Itoa(c.rpcPort)
listener, err := net.Listen("tcp", address) listener, err := net.Listen("tcp", address)
if err != nil { if err != nil {
panic("listening err:" + err.Error() + rpc.rpcRegisterName) panic("listening err:" + err.Error() + c.rpcRegisterName)
} }
log.NewInfo("0", "listen network success, ", address, listener) log.NewInfo("0", "listen network success, ", address, listener)
//grpc server //grpc server
@ -204,7 +108,7 @@ func (rpc *rpcConversation) Run() {
defer srv.GracefulStop() defer srv.GracefulStop()
//service registers with etcd //service registers with etcd
pbConversation.RegisterConversationServer(srv, rpc) pbConversation.RegisterConversationServer(srv, c)
rpcRegisterIP := config.Config.RpcRegisterIP rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" { if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP() rpcRegisterIP, err = utils.GetLocalIP()
@ -213,13 +117,13 @@ func (rpc *rpcConversation) Run() {
} }
} }
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP) log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10) err = getcdv3.RegisterEtcd(c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName, 10, "")
if err != nil { if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), log.NewError("0", "RegisterEtcd failed ", err.Error(),
rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
panic(utils.Wrap(err, "register conversation module rpc to etcd err")) panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
} }
log.NewInfo("0", "RegisterConversationServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) log.NewInfo("0", "RegisterConversationServer ok ", c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
err = srv.Serve(listener) err = srv.Serve(listener)
if err != nil { if err != nil {
log.NewError("0", "Serve failed ", err.Error()) log.NewError("0", "Serve failed ", err.Error())
@ -227,3 +131,141 @@ func (rpc *rpcConversation) Run() {
} }
log.NewInfo("0", "rpc conversation ok") log.NewInfo("0", "rpc conversation ok")
} }
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
if err != nil {
return nil, err
}
if len(conversations) > 0 {
if err := utils.CopyStructFields(resp.Conversation, &conversations[0]); err != nil {
return nil, err
}
return resp, nil
}
return nil, nil
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil {
return nil, err
}
return resp, nil
}
func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) {
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
}
if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil {
return nil, err
}
return resp, nil
}
func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) {
resp := &pbConversation.BatchSetConversationsResp{}
var conversations []*table.ConversationModel
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
return nil, err
}
err := c.ConversationInterface.SetUserConversations(ctx, req.OwnerUserID, conversations)
if err != nil {
return nil, err
}
chat.ConversationChangeNotification(ctx, req.OwnerUserID)
return resp, nil
}
func (c *conversationServer) SetConversation(ctx context.Context, req *pbConversation.SetConversationReq) (*pbConversation.SetConversationResp, error) {
panic("implement me")
}
func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) {
panic("implement me")
}
func (c *conversationServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) {
resp := &pbConversation.ModifyConversationFieldResp{}
var err error
isSyncConversation := true
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := c.groupChecker.GetGroupInfo(req.Conversation.GroupID)
if err != nil {
return nil, err
}
if groupInfo.Status == constant.GroupStatusDismissed && req.FieldType != constant.FieldUnread {
return nil, err
}
}
var conversation table.ConversationModel
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
if req.FieldType == constant.FieldIsPrivateChat {
err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, req.Conversation)
if err != nil {
return nil, err
}
chat.ConversationSetPrivateNotification(req.OperationID, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil
}
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
//if err != nil {
// return nil, err
//}
filedMap := make(map[string]interface{})
switch req.FieldType {
case constant.FieldRecvMsgOpt:
filedMap["recv_msg_opt"] = req.Conversation.RecvMsgOpt
case constant.FieldGroupAtType:
filedMap["group_at_type"] = req.Conversation.GroupAtType
case constant.FieldIsNotInGroup:
filedMap["is_not_in_group"] = req.Conversation.IsNotInGroup
case constant.FieldIsPinned:
filedMap["is_pinned"] = req.Conversation.IsPinned
case constant.FieldEx:
filedMap["ex"] = req.Conversation.Ex
case constant.FieldAttachedInfo:
filedMap["attached_info"] = req.Conversation.AttachedInfo
case constant.FieldUnread:
isSyncConversation = false
filedMap["update_unread_count_time"] = req.Conversation.UpdateUnreadCountTime
case constant.FieldBurnDuration:
filedMap["burn_duration"] = req.Conversation.BurnDuration
}
c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
err = c.ConversationInterface.UpdateUsersConversationFiled(ctx, haveUserID, req.Conversation.ConversationID, filedMap)
if err != nil {
return nil, err
}
var conversations []*pbConversation.Conversation
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
temp := new(pbConversation.Conversation)
_ = utils.CopyStructFields(temp, req.Conversation)
temp.OwnerUserID = v
conversations = append(conversations, temp)
}
err = c.ConversationInterface.CreateConversation(ctx, conversations)
if err != nil {
return nil, err
}
if isSyncConversation {
for _, v := range req.UserIDList {
chat.ConversationChangeNotification(req.OperationID, v)
}
} else {
for _, v := range req.UserIDList {
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
}
}
return resp, nil
}