conversation

This commit is contained in:
wangchuxiao 2023-05-12 16:45:50 +08:00
parent 01ccb6f404
commit 7456d3e8c0
12 changed files with 1824 additions and 1221 deletions

View File

@ -159,14 +159,10 @@ services:
grafana:
image: grafana/grafana
volumes:
# - ./grafana/dashboards/dashboard.json:/var/lib/grafana/dashboards/dashboard.json
# - ./grafana/provisioning/dashboard.yaml:/etc/grafana/provisioning/dashboards/dashboard.yaml
- ./.docker-compose_cfg/datasource-compose.yaml:/etc/grafana/provisioning/datasources/datasource.yaml
- ./.docker-compose_cfg/grafana.ini:/etc/grafana/grafana.ini
- ./.docker-compose_cfg/node-exporter-full_rev1.json:/var/lib/grafana/dashboards/node-exporter-full_rev1.json
container_name: grafana
# ports:
# - 10007:10007
depends_on:
- prometheus
network_mode: "host"

View File

@ -2,7 +2,6 @@ package api
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/a2r"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
@ -22,9 +21,7 @@ type Auth struct {
}
func (o *Auth) client(ctx context.Context) (auth.AuthClient, error) {
log.ZDebug(ctx, "before get grpc conn from zk registry")
conn, err := o.c.GetConn(ctx, config.Config.RpcRegisterName.OpenImAuthName)
log.ZDebug(ctx, "after get grpc conn from zk registry")
if err != nil {
return nil, err
}

View File

@ -54,11 +54,11 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
if err != nil {
return nil, err
}
if len(conversations) > 0 {
resp.Conversation = convert.ConversationDB2Pb(conversations[0])
return resp, nil
if len(conversations) < 1 {
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
}
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
resp.Conversation = convert.ConversationDB2Pb(conversations[0])
return resp, nil
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
@ -126,12 +126,9 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
return nil, err
}
}
var conversation tableRelation.ConversationModel
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
conversation := convert.ConversationPb2DB(req.Conversation)
if req.FieldType == constant.FieldIsPrivateChat {
err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, &conversation)
err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, []*tableRelation.ConversationModel{conversation})
if err != nil {
return nil, err
}
@ -144,8 +141,6 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
filedMap["recv_msg_opt"] = req.Conversation.RecvMsgOpt
case constant.FieldGroupAtType:
filedMap["group_at_type"] = req.Conversation.GroupAtType
case constant.FieldIsNotInGroup:
filedMap["is_not_in_group"] = req.Conversation.IsNotInGroup
case constant.FieldIsPinned:
filedMap["is_pinned"] = req.Conversation.IsPinned
case constant.FieldEx:
@ -155,10 +150,11 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
case constant.FieldUnread:
isSyncConversation = false
filedMap["update_unread_count_time"] = req.Conversation.UpdateUnreadCountTime
filedMap["has_read_seq"] = req.Conversation.HasReadSeq
case constant.FieldBurnDuration:
filedMap["burn_duration"] = req.Conversation.BurnDuration
}
err = c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap)
err = c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDList, conversation, filedMap)
if err != nil {
return nil, err
}
@ -169,12 +165,87 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
}
} else {
for _, v := range req.UserIDList {
c.conversationNotificationSender.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
c.conversationNotificationSender.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime, req.Conversation.HasReadSeq)
}
}
return resp, nil
}
func (c *conversationServer) SetConversations(ctx context.Context, req *pbConversation.SetConversationsReq) (*pbConversation.SetConversationsResp, error) {
isSyncConversation := true
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
return nil, err
}
if groupInfo.Status == constant.GroupStatusDismissed {
return nil, err
}
}
var conversation tableRelation.ConversationModel
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
conversation.GroupID = req.Conversation.GroupID
m := make(map[string]interface{})
if req.Conversation.RecvMsgOpt != nil {
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
}
if req.Conversation.DraftTextTime != nil {
m["draft_text_time"] = req.Conversation.DraftTextTime.Value
}
if req.Conversation.UnreadCount != nil {
m["unread_count"] = req.Conversation.UnreadCount.Value
}
if req.Conversation.AttachedInfo != nil {
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
if req.Conversation.Ex != nil {
m["ex"] = req.Conversation.Ex.Value
}
if req.Conversation.IsPinned != nil {
m["is_pinned"] = req.Conversation.IsPinned.Value
}
if req.Conversation.IsPrivateChat != nil {
var conversations []*tableRelation.ConversationModel
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
conversation.OwnerUserID = ownerUserID
conversation.IsPrivateChat = req.Conversation.IsPrivateChat.Value
conversations = append(conversations, &conversation2)
}
if err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, conversations); err != nil {
return nil, err
}
for _, ownerUserID := range req.UserIDs {
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, ownerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value)
}
}
if req.Conversation.BurnDuration != nil {
m["burn_duration"] = req.Conversation.BurnDuration.Value
}
if req.Conversation.HasReadSeq != nil && req.Conversation.UpdateUnreadCountTime != nil {
isSyncConversation = false
m["has_read_seq"] = req.Conversation.HasReadSeq.Value
m["update_unread_count_time"] = req.Conversation.UpdateUnreadCountTime.Value
}
err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDs, &conversation, m)
if err != nil {
return nil, err
}
if isSyncConversation {
for _, v := range req.UserIDs {
c.conversationNotificationSender.ConversationChangeNotification(ctx, v)
}
} else {
for _, v := range req.UserIDs {
c.conversationNotificationSender.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime.Value, req.Conversation.HasReadSeq.Value)
}
}
return &pbConversation.SetConversationsResp{}, nil
}
// 获取超级大群开启免打扰的用户ID
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbConversation.GetRecvMsgNotNotifyUserIDsReq) (*pbConversation.GetRecvMsgNotNotifyUserIDsResp, error) {
userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID)

View File

@ -249,10 +249,10 @@ const (
FieldAttachedInfo = 3
FieldIsPrivateChat = 4
FieldGroupAtType = 5
FieldIsNotInGroup = 6
FieldEx = 7
FieldUnread = 8
FieldBurnDuration = 9
FieldHasReadSeq = 10
)
const (

View File

@ -15,7 +15,7 @@ import (
const (
scanCount = 3000
retryTimes = 5
maxRetryTimes = 5
retryInterval = time.Second * 1
)
@ -31,13 +31,14 @@ type metaCache interface {
}
func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
return &metaCacheRedis{rcClient: rcClient, keys: keys}
return &metaCacheRedis{rcClient: rcClient, keys: keys, maxRetryTimes: maxRetryTimes, retryInterval: retryInterval}
}
type metaCacheRedis struct {
rcClient *rockscache.Client
keys []string
maxRetryTimes int
retryInterval time.Duration
}
func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
@ -47,8 +48,9 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context) error {
for {
if err := m.rcClient.TagAsDeletedBatch2(ctx, m.keys); err != nil {
if retryTimes >= m.maxRetryTimes {
err = errs.ErrInternalServer.Wrap(fmt.Sprintf("delete cache error %v, retry times %d", err, retryTimes))
err = errs.ErrInternalServer.Wrap(fmt.Sprintf("delete cache error: %v, keys: %v, retry times %d, please check redis server", err, m.keys, retryTimes))
log.ZWarn(ctx, "delete cache failed", err, "keys", m.keys)
return err
}
retryTimes++
} else {

View File

@ -17,7 +17,7 @@ type ConversationDatabase interface {
//CreateConversation 创建一批新的会话
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationTb.ConversationModel) error
//FindConversations 根据会话ID获取某个用户的多个会话
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
//FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID
@ -95,39 +95,39 @@ func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context,
}
func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
if err := c.conversationDB.NewTx(tx).Create(ctx, conversations); err != nil {
return err
}
return nil
})
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
return nil
}
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error {
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
cache := c.cache.NewCache()
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
if err != nil {
return err
}
if len(haveUserIDs) > 0 {
_, err := conversationTx.UpdateByMap(ctx, []string{v[0]}, conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
for _, conversation := range conversations {
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
if err != nil {
return err
}
cache = cache.DelUsersConversation(conversation.ConversationID, v[0])
} else {
newConversation := *conversation
newConversation.OwnerUserID = v[0]
newConversation.UserID = v[1]
newConversation.ConversationID = conversation.ConversationID
newConversation.IsPrivateChat = conversation.IsPrivateChat
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
if len(haveUserIDs) > 0 {
_, err := conversationTx.UpdateByMap(ctx, []string{v[0]}, conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
if err != nil {
return err
}
cache = cache.DelUsersConversation(conversation.ConversationID, v[0])
} else {
newConversation := *conversation
newConversation.OwnerUserID = v[0]
newConversation.UserID = v[1]
newConversation.ConversationID = conversation.ConversationID
newConversation.IsPrivateChat = conversation.IsPrivateChat
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]})
}
cache = cache.DelConversationIDs([]string{v[0]})
}
}
return c.cache.ExecDel(ctx)

View File

@ -13,19 +13,17 @@ type ConversationModel struct {
UserID string `gorm:"column:user_id;type:char(64)" json:"userID"`
GroupID string `gorm:"column:group_id;type:char(128)" json:"groupID"`
RecvMsgOpt int32 `gorm:"column:recv_msg_opt" json:"recvMsgOpt"`
UnreadCount int32 `gorm:"column:unread_count" json:"unreadCount"`
DraftTextTime int64 `gorm:"column:draft_text_time" json:"draftTextTime"`
IsPinned bool `gorm:"column:is_pinned" json:"isPinned"`
IsPrivateChat bool `gorm:"column:is_private_chat" json:"isPrivateChat"`
BurnDuration int32 `gorm:"column:burn_duration;default:30" json:"burnDuration"`
GroupAtType int32 `gorm:"column:group_at_type" json:"groupAtType"`
IsNotInGroup bool `gorm:"column:is_not_in_group" json:"isNotInGroup"`
UpdateUnreadCountTime int64 `gorm:"column:update_unread_count_time" json:"updateUnreadCountTime"`
AttachedInfo string `gorm:"column:attached_info;type:varchar(1024)" json:"attachedInfo"`
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
MaxSeq int64 `gorm:"column:max_seq" json:"maxSeq"`
MinSeq int64 `gorm:"column:min_seq" json:"minSeq"`
HasReadSeq int64 `gorm:"column:is_read_seq" json:"hasReadSeq"`
HasReadSeq int64 `gorm:"column:has_read_seq" json:"hasReadSeq"`
}
func (ConversationModel) TableName() string {

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
syntax = "proto3";
package OpenIMServer.conversation;
import "wrapperspb/wrapperspb.proto";
option go_package = "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation";
message Conversation{
@ -15,18 +16,36 @@ message Conversation{
string attachedInfo = 10;
bool isPrivateChat = 11;
int32 groupAtType = 12;
bool isNotInGroup = 13;
string ex = 14;
int64 updateUnreadCountTime = 15;
int32 burnDuration = 16;
int64 minSeq = 17;
int64 maxSeq = 18;
int64 isReadSeq = 19;
string ex = 13;
int64 updateUnreadCountTime = 14;
int32 burnDuration = 15;
int64 minSeq = 16;
int64 maxSeq = 17;
int64 hasReadSeq = 18;
}
message ConversationReq{
string conversationID = 1;
int32 conversationType = 2;
string userID = 3;
string groupID = 4;
OpenIMServer.protobuf.Int32Value recvMsgOpt = 5;
OpenIMServer.protobuf.Int64Value draftTextTime = 6;
OpenIMServer.protobuf.BoolValue isPinned = 7;
OpenIMServer.protobuf.StringValue attachedInfo = 8;
OpenIMServer.protobuf.BoolValue isPrivateChat = 9;
OpenIMServer.protobuf.StringValue ex = 10;
OpenIMServer.protobuf.Int64Value updateUnreadCountTime = 11;
OpenIMServer.protobuf.Int32Value burnDuration = 12;
OpenIMServer.protobuf.Int64Value minSeq = 13;
OpenIMServer.protobuf.Int64Value maxSeq = 14;
OpenIMServer.protobuf.Int64Value hasReadSeq = 15;
}
message ModifyConversationFieldReq{
Conversation conversation = 1;
int32 fieldType = 2;
repeated string userIDList = 3;
repeated string userIDList = 1;
int32 FieldType = 2;
Conversation conversation = 3;
}
message ModifyConversationFieldResp{
@ -138,6 +157,13 @@ message GetConversationsHasReadAndMaxSeqResp {
map<string, Seqs> seqs = 1;
}
message SetConversationsReq {
repeated string userIDs = 1;
ConversationReq conversation = 2;
}
message SetConversationsResp {
}
service conversation {
rpc ModifyConversationField(ModifyConversationFieldReq)returns(ModifyConversationFieldResp);
@ -153,4 +179,5 @@ service conversation {
rpc DelGroupChatConversations(DelGroupChatConversationsReq) returns(DelGroupChatConversationsResp);
rpc GetConversationIDs(GetConversationIDsReq) returns(GetConversationIDsResp);
rpc GetConversationsHasReadAndMaxSeq(GetConversationsHasReadAndMaxSeqReq) returns(GetConversationsHasReadAndMaxSeqResp);
rpc SetConversations(SetConversationsReq) returns(SetConversationsResp);
}

File diff suppressed because it is too large Load Diff

View File

@ -394,11 +394,8 @@ message UserInfoUpdatedTips{
//////////////////////conversation/////////////////////
message ConversationUpdateTips{
string UserID = 1;
string userID = 1;
repeated string conversationIDList = 2;
int64 updateUnreadCountTime = 3;
}
message ConversationSetPrivateTips{
@ -407,6 +404,13 @@ message ConversationSetPrivateTips{
bool isPrivate = 3;
}
message ConversationHasReadTips {
string userID = 1;
string conversationID = 2;
int64 hasReadSeq = 3;
int64 unreadCountTime = 4;
}
////////////////////message///////////////////////
message seqs {
repeated int64 seqs = 1;

View File

@ -36,11 +36,12 @@ func (c *ConversationNotificationSender) ConversationChangeNotification(ctx cont
}
// 会话未读数同步
func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(ctx context.Context, userID, conversationID string, updateUnreadCountTime int64) error {
tips := &sdkws.ConversationUpdateTips{
UserID: userID,
ConversationIDList: []string{conversationID},
UpdateUnreadCountTime: updateUnreadCountTime,
func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(ctx context.Context, userID, conversationID string, unreadCountTime, hasReadSeq int64) error {
tips := &sdkws.ConversationHasReadTips{
UserID: userID,
ConversationID: conversationID,
HasReadSeq: hasReadSeq,
UnreadCountTime: unreadCountTime,
}
return c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
}