Merge branch 'openimsdk:main' into main

This commit is contained in:
chao 2025-03-25 15:55:56 +08:00 committed by GitHub
commit 13cb82488c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 630 additions and 478 deletions

View File

@ -12,6 +12,10 @@ jobs:
go-build: go-build:
name: Test with go ${{ matrix.go_version }} on ${{ matrix.os }} name: Test with go ${{ matrix.go_version }} on ${{ matrix.os }}
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
env:
SHARE_CONFIG_PATH: config/share.yml
permissions: permissions:
contents: write contents: write
pull-requests: write pull-requests: write
@ -40,6 +44,10 @@ jobs:
with: with:
compose-file: "./docker-compose.yml" compose-file: "./docker-compose.yml"
- name: Modify Server Configuration
run: |
yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
# - name: Get Internal IP Address # - name: Get Internal IP Address
# id: get-ip # id: get-ip
# run: | # run: |
@ -71,6 +79,11 @@ jobs:
go mod download go mod download
go install github.com/magefile/mage@latest go install github.com/magefile/mage@latest
- name: Modify Chat Configuration
run: |
cd ${{ github.workspace }}/chat-repo
yq e '.openIM.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
- name: Build and test Chat Services - name: Build and test Chat Services
run: | run: |
cd ${{ github.workspace }}/chat-repo cd ${{ github.workspace }}/chat-repo
@ -132,7 +145,7 @@ jobs:
# Test get admin token # Test get admin token
get_admin_token_response=$(curl -X POST -H "Content-Type: application/json" -H "operationID: imAdmin" -d '{ get_admin_token_response=$(curl -X POST -H "Content-Type: application/json" -H "operationID: imAdmin" -d '{
"secret": "openIM123", "secret": "123456",
"platformID": 2, "platformID": 2,
"userID": "imAdmin" "userID": "imAdmin"
}' http://127.0.0.1:10002/auth/get_admin_token) }' http://127.0.0.1:10002/auth/get_admin_token)
@ -169,7 +182,8 @@ jobs:
contents: write contents: write
env: env:
SDK_DIR: openim-sdk-core SDK_DIR: openim-sdk-core
CONFIG_PATH: config/notification.yml NOTIFICATION_CONFIG_PATH: config/notification.yml
SHARE_CONFIG_PATH: config/share.yml
strategy: strategy:
matrix: matrix:
@ -184,7 +198,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
repository: "openimsdk/openim-sdk-core" repository: "openimsdk/openim-sdk-core"
ref: "release-v3.8" ref: "main"
path: ${{ env.SDK_DIR }} path: ${{ env.SDK_DIR }}
- name: Set up Go ${{ matrix.go_version }} - name: Set up Go ${{ matrix.go_version }}
@ -199,8 +213,9 @@ jobs:
- name: Modify Server Configuration - name: Modify Server Configuration
run: | run: |
yq e '.groupCreated.isSendMsg = true' -i ${{ env.CONFIG_PATH }} yq e '.groupCreated.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }}
yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.CONFIG_PATH }} yq e '.friendApplicationApproved.isSendMsg = true' -i ${{ env.NOTIFICATION_CONFIG_PATH }}
yq e '.secret = 123456' -i ${{ env.SHARE_CONFIG_PATH }}
- name: Start Server Services - name: Start Server Services
run: | run: |

View File

@ -181,3 +181,19 @@ afterImportFriends:
afterRemoveBlack: afterRemoveBlack:
enable: false enable: false
timeout: 5 timeout: 5
beforeCreateSingleChatConversations:
enable: false
timeout: 5
failedContinue: false
afterCreateSingleChatConversations:
enable: false
timeout: 5
failedContinue: false
beforeCreateGroupChatConversations:
enable: false
timeout: 5
failedContinue: false
afterCreateGroupChatConversations:
enable: false
timeout: 5
failedContinue: false

2
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.79 github.com/openimsdk/protocol v0.0.72-alpha.81
github.com/openimsdk/tools v0.0.50-alpha.74 github.com/openimsdk/tools v0.0.50-alpha.74
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0

4
go.sum
View File

@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= github.com/openimsdk/protocol v0.0.72-alpha.81 h1:6tDuZ3Anfi1uhX/V5mWxITqJnGQPnvgeaxeqJlEHIVE=
github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/protocol v0.0.72-alpha.81/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc=
github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=

View File

@ -16,6 +16,7 @@ package api
import ( import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/a2r" "github.com/openimsdk/tools/a2r"
) )
@ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) {
func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client)
} }
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
}

View File

@ -2,10 +2,14 @@ package jssdk
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort" "sort"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/jssdk"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(conversationIDs) == 0 { if len(conversationIDs) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(activeConversation) == 0 { if len(activeConversation) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
sortConversations := sortActiveConversations{ sortConversations := sortActiveConversations{
Conversation: activeConversation, Conversation: activeConversation,
} }
@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if err != nil { if err != nil {
return nil, err return nil, err
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
return c.ConversationID return c.ConversationID
}) })
@ -156,17 +162,16 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if !ok { if !ok {
continue continue
} }
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0]
}
resp = append(resp, &jssdk.ConversationMsg{ resp = append(resp, &jssdk.ConversationMsg{
Conversation: conv, Conversation: conv,
LastMsg: lastMsg, LastMsg: msgList.Msgs[0],
MaxSeq: c.MaxSeq, MaxSeq: c.MaxSeq,
ReadSeq: readSeq[c.ConversationID], ReadSeq: readSeq[c.ConversationID],
}) })
} }
}
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
} }
@ -219,19 +224,19 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
return nil, err return nil, err
} }
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
for _, c := range conversations { for _, c := range conversations {
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0]
}
resp = append(resp, &jssdk.ConversationMsg{ resp = append(resp, &jssdk.ConversationMsg{
Conversation: c, Conversation: c,
LastMsg: lastMsg, LastMsg: msgList.Msgs[0],
MaxSeq: maxSeqs[c.ConversationID], MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID], ReadSeq: readSeqs[c.ConversationID],
}) })
} }
}
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
} }
@ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
UnreadCount: unreadCount, UnreadCount: unreadCount,
}, nil }, nil
} }
// This function checks whether the latest MaxSeq message is valid.
// If not, it needs to fetch a valid message again.
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
var conversationIDs []string
for conversationID, message := range messages {
allInValid := true
for _, data := range message.Msgs {
if data.Status < constant.MsgStatusHasDeleted {
allInValid = false
break
}
}
if allInValid {
conversationIDs = append(conversationIDs, conversationID)
}
}
if len(conversationIDs) > 0 {
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: userID,
ConversationIDs: conversationIDs,
})
if err != nil {
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
return
}
for conversationID, message := range resp.Msgs {
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
}
}
}

View File

@ -551,11 +551,3 @@ func (m *MessageApi) SearchMsg(c *gin.Context) {
func (m *MessageApi) GetServerTime(c *gin.Context) { func (m *MessageApi) GetServerTime(c *gin.Context) {
a2r.Call(c, msg.MsgClient.GetServerTime, m.Client) a2r.Call(c, msg.MsgClient.GetServerTime, m.Client)
} }
func (m *MessageApi) GetStreamMsg(c *gin.Context) {
a2r.Call(c, msg.MsgClient.GetServerTime, m.Client)
}
func (m *MessageApi) AppendStreamMsg(c *gin.Context) {
a2r.Call(c, msg.MsgClient.GetServerTime, m.Client)
}

View File

@ -9,6 +9,8 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding" "github.com/gin-gonic/gin/binding"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/internal/api/jssdk"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -27,7 +29,6 @@ import (
"github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
clientv3 "go.etcd.io/etcd/client/v3"
) )
const ( const (
@ -246,8 +247,6 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
msgGroup.POST("/batch_send_msg", m.BatchSendMsg) msgGroup.POST("/batch_send_msg", m.BatchSendMsg)
msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess) msgGroup.POST("/check_msg_is_send_success", m.CheckMsgIsSendSuccess)
msgGroup.POST("/get_server_time", m.GetServerTime) msgGroup.POST("/get_server_time", m.GetServerTime)
msgGroup.POST("/get_stream_msg", m.GetStreamMsg)
msgGroup.POST("/append_stream_msg", m.AppendStreamMsg)
} }
// Conversation // Conversation
{ {
@ -264,6 +263,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser)
} }
{ {

View File

@ -0,0 +1,117 @@
package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/utils/datautil"
)
func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after)
return nil
}
func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after)
return nil
}

View File

@ -22,6 +22,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@ -30,6 +32,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@ -39,7 +42,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
) )
type conversationServer struct { type conversationServer struct {
@ -49,6 +51,7 @@ type conversationServer struct {
conversationNotificationSender *ConversationNotificationSender conversationNotificationSender *ConversationNotificationSender
config *Config config *Config
webhookClient *webhook.Client
userClient *rpcli.UserClient userClient *rpcli.UserClient
msgClient *rpcli.MsgClient msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient groupClient *rpcli.GroupClient
@ -60,6 +63,7 @@ type Config struct {
MongodbConfig config.Mongo MongodbConfig config.Mongo
NotificationConfig config.Notification NotificationConfig config.Notification
Share config.Share Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache LocalCacheConfig config.LocalCache
Discovery config.Discovery Discovery config.Discovery
} }
@ -90,16 +94,25 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil { if err != nil {
return err return err
} }
msgClient := rpcli.NewMsgClient(msgConn) msgClient := rpcli.NewMsgClient(msgConn)
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{ cs := conversationServer{
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), config: config,
conversationDatabase: controller.NewConversationDatabase(conversationDB, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn), userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn), groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient, msgClient: msgClient,
}) }
cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient)
cs.conversationDatabase = controller.NewConversationDatabase(
conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB),
mgocli.GetTx())
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &cs)
return nil return nil
} }
@ -317,6 +330,19 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return &pbconversation.SetConversationsResp{}, nil return &pbconversation.SetConversationsResp{}, nil
} }
func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) {
m := make(map[string]any)
if req.Ex != nil {
m["ex"] = req.Ex.Value
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil {
return nil, err
}
}
return &pbconversation.UpdateConversationsByUserResp{}, nil
}
// Get user IDs with "Do Not Disturb" enabled in super large groups. // Get user IDs with "Do Not Disturb" enabled in super large groups.
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
return nil, errs.New("deprecated") return nil, errs.New("deprecated")
@ -326,49 +352,76 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
req *pbconversation.CreateSingleChatConversationsReq, req *pbconversation.CreateSingleChatConversationsReq,
) (*pbconversation.CreateSingleChatConversationsResp, error) { ) (*pbconversation.CreateSingleChatConversationsResp, error) {
var conversation dbModel.Conversation
switch req.ConversationType { switch req.ConversationType {
case constant.SingleChatType: case constant.SingleChatType:
var conversation dbModel.Conversation // sendUser create
conversation.ConversationID = req.ConversationID conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID conversation.UserID = req.RecvID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil { if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
} }
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
// recvUser create
conversation2 := conversation conversation2 := conversation
conversation2.OwnerUserID = req.RecvID conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID conversation2.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2}) err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
if err != nil { if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
} }
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation2)
case constant.NotificationChatType: case constant.NotificationChatType:
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID conversation.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil { if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
} }
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
} }
return &pbconversation.CreateSingleChatConversationsResp{}, nil return &pbconversation.CreateSingleChatConversationsResp{}, nil
} }
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) { func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs) var conversation dbModel.Conversation
conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
conversation.GroupID = req.GroupID
conversation.ConversationType = constant.ReadGroupChatType
if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, &conversation); err != nil {
return nil, err
}
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs, &conversation)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil { c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation)
return nil, err
}
return &pbconversation.CreateGroupChatConversationsResp{}, nil return &pbconversation.CreateGroupChatConversationsResp{}, nil
} }

View File

@ -48,7 +48,3 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv
} }
m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
} }
func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) {
m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips)
}

View File

@ -17,6 +17,8 @@ package msg
import ( import (
"context" "context"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil"
@ -29,7 +31,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"google.golang.org/protobuf/proto"
) )
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
@ -49,11 +50,6 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.
func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) { func (m *msgServer) sendMsg(ctx context.Context, req *pbmsg.SendMsgReq, before **sdkws.MsgData) (*pbmsg.SendMsgResp, error) {
m.encapsulateMsgData(req.MsgData) m.encapsulateMsgData(req.MsgData)
if req.MsgData.ContentType == constant.Stream {
if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil {
return nil, err
}
}
switch req.MsgData.SessionType { switch req.MsgData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req, before) return m.sendMsgSingleChat(ctx, req, before)

View File

@ -61,7 +61,6 @@ type msgServer struct {
msg.UnimplementedMsgServer msg.UnimplementedMsgServer
RegisterCenter discovery.Conn // Service discovery registry for service registration. RegisterCenter discovery.Conn // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
StreamMsgDatabase controller.StreamMsgDatabase
UserLocalCache *rpccache.UserLocalCache // Local cache for user data. UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
@ -117,10 +116,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil { if err != nil {
return err return err
} }
streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User)
if err != nil { if err != nil {
@ -142,7 +137,6 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer) msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, redisProducer)
s := &msgServer{ s := &msgServer{
MsgDatabase: msgDatabase, MsgDatabase: msgDatabase,
StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg),
RegisterCenter: client, RegisterCenter: client,
UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb),
GroupLocalCache: rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb), GroupLocalCache: rpccache.NewGroupLocalCache(rpcli.NewGroupClient(groupConn), &config.LocalCacheConfig, rdb),

View File

@ -1,115 +0,0 @@
package msg
import (
"context"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
)
const StreamDeadlineTime = time.Second * 60 * 10
func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error {
now := time.Now()
val := &model.StreamMsg{
ClientMsgID: msgData.ClientMsgID,
ConversationID: msgprocessor.GetConversationIDByMsg(msgData),
UserID: msgData.SendID,
CreateTime: now,
DeadlineTime: now.Add(StreamDeadlineTime),
}
return m.StreamMsgDatabase.CreateStreamMsg(ctx, val)
}
func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID)
if err != nil {
return nil, err
}
now := time.Now()
if !res.End && res.DeadlineTime.Before(now) {
res.End = true
res.DeadlineTime = now
_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now)
}
return res, nil
}
func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
if res.End {
return nil, errs.ErrNoPermission.WrapMsg("stream msg is end")
}
if len(res.Packets) < int(req.StartIndex) {
return nil, errs.ErrNoPermission.WrapMsg("start index is invalid")
}
if val := len(res.Packets) - int(req.StartIndex); val > 0 {
exist := res.Packets[int(req.StartIndex):]
for i, s := range exist {
if len(req.Packets) == 0 {
break
}
if s != req.Packets[i] {
return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i))
}
req.StartIndex++
req.Packets = req.Packets[1:]
}
}
if len(req.Packets) == 0 && res.End == req.End {
return &msg.AppendStreamMsgResp{}, nil
}
deadlineTime := time.Now().Add(StreamDeadlineTime)
if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil {
return nil, err
}
conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID)
if err != nil {
return nil, err
}
tips := &sdkws.StreamMsgTips{
ConversationID: res.ConversationID,
ClientMsgID: res.ClientMsgID,
StartIndex: req.StartIndex,
Packets: req.Packets,
End: req.End,
}
var (
recvID string
sessionType int32
)
if conversation.GroupID == "" {
sessionType = constant.SingleChatType
recvID = conversation.UserID
} else {
sessionType = constant.ReadGroupChatType
recvID = conversation.GroupID
}
m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips)
return &msg.AppendStreamMsgResp{}, nil
}
func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) {
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
if err != nil {
return nil, err
}
return &msg.GetStreamMsgResp{
ClientMsgID: res.ClientMsgID,
ConversationID: res.ConversationID,
UserID: res.UserID,
Packets: res.Packets,
End: res.End,
CreateTime: res.CreateTime.UnixMilli(),
DeadlineTime: res.DeadlineTime.UnixMilli(),
}, nil
}

View File

@ -49,6 +49,10 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
const (
defaultSecret = "openIM123"
)
type userServer struct { type userServer struct {
pbuser.UnimplementedUserServer pbuser.UnimplementedUserServer
online cache.OnlineCache online cache.OnlineCache
@ -88,7 +92,7 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
users := make([]*tablerelation.User, 0) users := make([]*tablerelation.User, 0)
for _, v := range config.Share.IMAdminUserID { for _, v := range config.Share.IMAdminUserID {
users = append(users, &tablerelation.User{UserID: v, Nickname: v, AppMangerLevel: constant.AppNotificationAdmin}) users = append(users, &tablerelation.User{UserID: v, Nickname: v, AppMangerLevel: constant.AppAdmin})
} }
userDB, err := mgo.NewUserMongo(mgocli.GetDB()) userDB, err := mgo.NewUserMongo(mgocli.GetDB())
if err != nil { if err != nil {
@ -273,6 +277,10 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
if len(req.Users) == 0 { if len(req.Users) == 0 {
return nil, errs.ErrArgs.WrapMsg("users is empty") return nil, errs.ErrArgs.WrapMsg("users is empty")
} }
// check if secret is changed
if s.config.Share.Secret == defaultSecret {
return nil, servererrs.ErrSecretNotChanged.Wrap()
}
if err = authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil { if err = authverify.CheckAdmin(ctx, s.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err
@ -605,7 +613,7 @@ func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.Get
if err != nil { if err != nil {
return nil, servererrs.ErrUserIDNotFound.Wrap() return nil, servererrs.ErrUserIDNotFound.Wrap()
} }
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel >= constant.AppNotificationAdmin { if user.AppMangerLevel >= constant.AppAdmin {
return &pbuser.GetNotificationAccountResp{Account: &pbuser.NotificationAccountInfo{ return &pbuser.GetNotificationAccountResp{Account: &pbuser.NotificationAccountInfo{
UserID: user.UserID, UserID: user.UserID,
FaceURL: user.FaceURL, FaceURL: user.FaceURL,

View File

@ -46,7 +46,7 @@ func TestName(t *testing.T) {
srv := &cronServer{ srv := &cronServer{
ctx: ctx, ctx: ctx,
config: &CronTaskConfig{ config: &Config{
CronTask: config.CronTask{ CronTask: config.CronTask{
RetainChatRecords: 1, RetainChatRecords: 1,
FileExpireTime: 1, FileExpireTime: 1,

View File

@ -62,4 +62,8 @@ const (
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand" CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
CallbackBeforeCreateSingleChatConversationsCommand = "callbackBeforeCreateSingleChatConversationsCommand"
CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand"
CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand"
CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand"
) )

View File

@ -0,0 +1,91 @@
package callbackstruct
type CallbackBeforeCreateSingleChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
UserID string `json:"user_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackBeforeCreateSingleChatConversationsResp struct {
CommonCallbackResp
RecvMsgOpt *int32 `json:"recv_msg_opt"`
IsPinned *bool `json:"is_pinned"`
IsPrivateChat *bool `json:"is_private_chat"`
BurnDuration *int32 `json:"burn_duration"`
GroupAtType *int32 `json:"group_at_type"`
AttachedInfo *string `json:"attached_info"`
Ex *string `json:"ex"`
}
type CallbackAfterCreateSingleChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
UserID string `json:"user_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackAfterCreateSingleChatConversationsResp struct {
CommonCallbackResp
}
type CallbackBeforeCreateGroupChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
GroupID string `json:"group_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackBeforeCreateGroupChatConversationsResp struct {
CommonCallbackResp
RecvMsgOpt *int32 `json:"recv_msg_opt"`
IsPinned *bool `json:"is_pinned"`
IsPrivateChat *bool `json:"is_private_chat"`
BurnDuration *int32 `json:"burn_duration"`
GroupAtType *int32 `json:"group_at_type"`
AttachedInfo *string `json:"attached_info"`
Ex *string `json:"ex"`
}
type CallbackAfterCreateGroupChatConversationsReq struct {
CallbackCommand `json:"callbackCommand"`
OwnerUserID string `json:"owner_user_id"`
ConversationID string `json:"conversation_id"`
ConversationType int32 `json:"conversation_type"`
GroupID string `json:"group_id"`
RecvMsgOpt int32 `json:"recv_msg_opt"`
IsPinned bool `json:"is_pinned"`
IsPrivateChat bool `json:"is_private_chat"`
BurnDuration int32 `json:"burn_duration"`
GroupAtType int32 `json:"group_at_type"`
AttachedInfo string `json:"attached_info"`
Ex string `json:"ex"`
}
type CallbackAfterCreateGroupChatConversationsResp struct {
CommonCallbackResp
}

View File

@ -41,6 +41,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
config.MongodbConfigFileName: &conversationConfig.MongodbConfig, config.MongodbConfigFileName: &conversationConfig.MongodbConfig,
config.ShareFileName: &conversationConfig.Share, config.ShareFileName: &conversationConfig.Share,
config.NotificationFileName: &conversationConfig.NotificationConfig, config.NotificationFileName: &conversationConfig.NotificationConfig,
config.WebhooksConfigFileName: &conversationConfig.WebhooksConfig,
config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig,
config.DiscoveryConfigFilename: &conversationConfig.Discovery, config.DiscoveryConfigFilename: &conversationConfig.Discovery,
} }
@ -67,6 +68,7 @@ func (a *ConversationRpcCmd) runE() error {
a.conversationConfig.NotificationConfig.GetConfigFileName(), a.conversationConfig.NotificationConfig.GetConfigFileName(),
a.conversationConfig.Share.GetConfigFileName(), a.conversationConfig.Share.GetConfigFileName(),
a.conversationConfig.LocalCacheConfig.GetConfigFileName(), a.conversationConfig.LocalCacheConfig.GetConfigFileName(),
a.conversationConfig.WebhooksConfig.GetConfigFileName(),
a.conversationConfig.Discovery.GetConfigFileName(), a.conversationConfig.Discovery.GetConfigFileName(),
}, nil, }, nil,
conversation.Start) conversation.Start)

View File

@ -435,6 +435,10 @@ type Webhooks struct {
BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"` BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"`
AfterImportFriends AfterConfig `yaml:"afterImportFriends"` AfterImportFriends AfterConfig `yaml:"afterImportFriends"`
AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"` AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"`
BeforeCreateSingleChatConversations BeforeConfig `yaml:"beforeCreateSingleChatConversations"`
AfterCreateSingleChatConversations AfterConfig `yaml:"afterCreateSingleChatConversations"`
BeforeCreateGroupChatConversations BeforeConfig `yaml:"beforeCreateGroupChatConversations"`
AfterCreateGroupChatConversations AfterConfig `yaml:"afterCreateGroupChatConversations"`
} }
type ZooKeeper struct { type ZooKeeper struct {

View File

@ -10,7 +10,7 @@ import (
func TestLoadLogConfig(t *testing.T) { func TestLoadLogConfig(t *testing.T) {
var log Log var log Log
os.Setenv("IMENV_LOG_REMAINLOGLEVEL", "5") os.Setenv("IMENV_LOG_REMAINLOGLEVEL", "5")
err := Load("../../../config/", "log.yml", "IMENV_LOG", "source", &log) err := Load("../../../config/", "log.yml", "IMENV_LOG", &log)
assert.Nil(t, err) assert.Nil(t, err)
t.Log(log.RemainLogLevel) t.Log(log.RemainLogLevel)
// assert.Equal(t, "../../../../logs/", log.StorageLocation) // assert.Equal(t, "../../../../logs/", log.StorageLocation)
@ -22,7 +22,7 @@ func TestLoadMongoConfig(t *testing.T) {
os.Setenv("IMENV_MONGODB_PASSWORD", "openIM1231231") os.Setenv("IMENV_MONGODB_PASSWORD", "openIM1231231")
// os.Setenv("IMENV_MONGODB_URI", "openIM123") // os.Setenv("IMENV_MONGODB_URI", "openIM123")
// os.Setenv("IMENV_MONGODB_USERNAME", "openIM123") // os.Setenv("IMENV_MONGODB_USERNAME", "openIM123")
err := Load("../../../config/", "mongodb.yml", "IMENV_MONGODB", "source", &mongo) err := Load("../../../config/", "mongodb.yml", "IMENV_MONGODB", &mongo)
// err := LoadApiConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo) // err := LoadApiConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo)
assert.Nil(t, err) assert.Nil(t, err)
@ -38,14 +38,14 @@ func TestLoadMongoConfig(t *testing.T) {
func TestLoadMinioConfig(t *testing.T) { func TestLoadMinioConfig(t *testing.T) {
var storageConfig Minio var storageConfig Minio
err := Load("../../../config/minio.yml", "IMENV_MINIO", "", "source", &storageConfig) err := Load("../../../config/minio.yml", "IMENV_MINIO", "", &storageConfig)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "openim", storageConfig.Bucket) assert.Equal(t, "openim", storageConfig.Bucket)
} }
func TestLoadWebhooksConfig(t *testing.T) { func TestLoadWebhooksConfig(t *testing.T) {
var webhooks Webhooks var webhooks Webhooks
err := Load("../../../config/webhooks.yml", "IMENV_WEBHOOKS", "", "source", &webhooks) err := Load("../../../config/webhooks.yml", "IMENV_WEBHOOKS", "", &webhooks)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 5, webhooks.BeforeAddBlack.Timeout) assert.Equal(t, 5, webhooks.BeforeAddBlack.Timeout)
@ -53,7 +53,7 @@ func TestLoadWebhooksConfig(t *testing.T) {
func TestLoadOpenIMRpcUserConfig(t *testing.T) { func TestLoadOpenIMRpcUserConfig(t *testing.T) {
var user User var user User
err := Load("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", "", "source", &user) err := Load("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", "", &user)
assert.Nil(t, err) assert.Nil(t, err)
//export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0" //export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0"
assert.Equal(t, "0.0.0.0", user.RPC.ListenIP) assert.Equal(t, "0.0.0.0", user.RPC.ListenIP)
@ -63,14 +63,14 @@ func TestLoadOpenIMRpcUserConfig(t *testing.T) {
func TestLoadNotificationConfig(t *testing.T) { func TestLoadNotificationConfig(t *testing.T) {
var noti Notification var noti Notification
err := Load("../../../config/notification.yml", "IMENV_NOTIFICATION", "", "source", &noti) err := Load("../../../config/notification.yml", "IMENV_NOTIFICATION", "", &noti)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "Your friend's profile has been changed", noti.FriendRemarkSet.OfflinePush.Title) assert.Equal(t, "Your friend's profile has been changed", noti.FriendRemarkSet.OfflinePush.Title)
} }
func TestLoadOpenIMThirdConfig(t *testing.T) { func TestLoadOpenIMThirdConfig(t *testing.T) {
var third Third var third Third
err := Load("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", "", "source", &third) err := Load("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", "", &third)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "enabled", third.Object.Enable) assert.Equal(t, "enabled", third.Object.Enable)
assert.Equal(t, "https://oss-cn-chengdu.aliyuncs.com", third.Object.Oss.Endpoint) assert.Equal(t, "https://oss-cn-chengdu.aliyuncs.com", third.Object.Oss.Endpoint)
@ -86,7 +86,7 @@ func TestLoadOpenIMThirdConfig(t *testing.T) {
func TestTransferConfig(t *testing.T) { func TestTransferConfig(t *testing.T) {
var tran MsgTransfer var tran MsgTransfer
err := Load("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", "", "source", &tran) err := Load("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", "", &tran)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, true, tran.Prometheus.Enable) assert.Equal(t, true, tran.Prometheus.Enable)
assert.Equal(t, true, tran.Prometheus.AutoSetPorts) assert.Equal(t, true, tran.Prometheus.AutoSetPorts)

View File

@ -38,6 +38,7 @@ const (
// General error codes. // General error codes.
const ( const (
NoError = 0 // No error NoError = 0 // No error
DatabaseError = 90002 // Database error (redis/mysql, etc.) DatabaseError = 90002 // Database error (redis/mysql, etc.)
NetworkError = 90004 // Network error NetworkError = 90004 // Network error
DataError = 90007 // Data error DataError = 90007 // Data error
@ -50,6 +51,7 @@ const (
NoPermissionError = 1002 // Insufficient permission NoPermissionError = 1002 // Insufficient permission
DuplicateKeyError = 1003 DuplicateKeyError = 1003
RecordNotFoundError = 1004 // Record does not exist RecordNotFoundError = 1004 // Record does not exist
SecretNotChangedError = 1050 // secret not changed
// Account error codes. // Account error codes.
UserIDNotFoundError = 1101 // UserID does not exist or is not registered UserIDNotFoundError = 1101 // UserID does not exist or is not registered

View File

@ -17,6 +17,8 @@ package servererrs
import "github.com/openimsdk/tools/errs" import "github.com/openimsdk/tools/errs"
var ( var (
ErrSecretNotChanged = errs.NewCodeError(SecretNotChangedError, "secret not changed, please change secret in config/share.yml for security reasons")
ErrDatabase = errs.NewCodeError(DatabaseError, "DatabaseError") ErrDatabase = errs.NewCodeError(DatabaseError, "DatabaseError")
ErrNetwork = errs.NewCodeError(NetworkError, "NetworkError") ErrNetwork = errs.NewCodeError(NetworkError, "NetworkError")
ErrCallback = errs.NewCodeError(CallbackError, "CallbackError") ErrCallback = errs.NewCodeError(CallbackError, "CallbackError")

View File

@ -80,8 +80,6 @@ func (a *authDatabase) BatchSetTokenMapByUidPid(ctx context.Context, tokens []st
// Create Token. // Create Token.
func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) {
isAdmin := authverify.IsManagerUserID(userID, a.adminUserIDs)
if !isAdmin {
tokens, err := a.cache.GetAllTokensWithoutError(ctx, userID) tokens, err := a.cache.GetAllTokensWithoutError(ctx, userID)
if err != nil { if err != nil {
return "", err return "", err
@ -106,7 +104,6 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
log.ZDebug(ctx, "kicked token in create token", "token", k) log.ZDebug(ctx, "kicked token in create token", "token", k)
} }
} }
}
claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire) claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire)
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
@ -115,11 +112,9 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI
return "", errs.WrapMsg(err, "token.SignedString") return "", errs.WrapMsg(err, "token.SignedString")
} }
if !isAdmin {
if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil { if err = a.cache.SetTokenFlagEx(ctx, userID, platformID, tokenString, constant.NormalToken); err != nil {
return "", err return "", err
} }
}
return tokenString, nil return tokenString, nil
} }
@ -224,9 +219,6 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string
} }
//var adminTokenMaxNum = a.multiLogin.MaxNumOneEnd //var adminTokenMaxNum = a.multiLogin.MaxNumOneEnd
//if a.multiLogin.Policy == constant.Customize {
// adminTokenMaxNum = a.multiLogin.CustomizeLoginNum[constant.AdminPlatformID]
//}
//l := len(adminToken) //l := len(adminToken)
//if platformID == constant.AdminPlatformID { //if platformID == constant.AdminPlatformID {
// l++ // l++
@ -234,5 +226,8 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string
//if l > adminTokenMaxNum { //if l > adminTokenMaxNum {
// kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) // kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...)
//} //}
if platformID == constant.AdminPlatformID {
kickToken = append(kickToken, adminToken...)
}
return deleteToken, kickToken, nil return deleteToken, kickToken, nil
} }

View File

@ -22,7 +22,6 @@ import (
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/db/tx"
@ -47,8 +46,11 @@ type ConversationDatabase interface {
// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is // SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is
// transactional. // transactional.
SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error
// UpdateUserConversations updates all conversations related to a specified user.
// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user).
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error
// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs.
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error
// GetConversationIDs retrieves conversation IDs for a given user. // GetConversationIDs retrieves conversation IDs for a given user.
GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetUserConversationIDsHash gets the hash of conversation IDs for a given user. // GetUserConversationIDsHash gets the hash of conversation IDs for a given user.
@ -146,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
}) })
} }
func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error {
conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
for _, conversation := range conversations {
cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID)
}
return cache.ChainExecDel(ctx)
}
func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error {
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
if err != nil { if err != nil {
@ -298,10 +312,10 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
// return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) // return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
//} //}
func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversation *relationtb.Conversation) error {
return c.tx.Transaction(ctx, func(ctx context.Context) error { return c.tx.Transaction(ctx, func(ctx context.Context) error {
cache := c.cache.CloneConversationCache() cache := c.cache.CloneConversationCache()
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) conversationID := conversation.ConversationID
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID})
if err != nil { if err != nil {
return err return err
@ -309,7 +323,15 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs) notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs)
var conversations []*relationtb.Conversation var conversations []*relationtb.Conversation
for _, v := range notExistUserIDs { for _, v := range notExistUserIDs {
conversation := relationtb.Conversation{ConversationType: constant.ReadGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversation := relationtb.Conversation{
ConversationType: conversation.ConversationType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID,
// the parameters have default value
RecvMsgOpt: conversation.RecvMsgOpt, IsPinned: conversation.IsPinned, IsPrivateChat: conversation.IsPrivateChat,
BurnDuration: conversation.BurnDuration, GroupAtType: conversation.GroupAtType, AttachedInfo: conversation.AttachedInfo,
Ex: conversation.Ex, MaxSeq: conversation.MaxSeq, MinSeq: conversation.MinSeq, CreateTime: conversation.CreateTime,
MsgDestructTime: conversation.MsgDestructTime, IsMsgDestruct: conversation.IsMsgDestruct, LatestMsgDestructTime: conversation.LatestMsgDestructTime,
}
conversations = append(conversations, &conversation) conversations = append(conversations, &conversation)
cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID)
} }
@ -320,7 +342,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
return err return err
} }
} }
_, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": 0}) _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": conversation.MaxSeq})
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,34 +0,0 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)
type StreamMsgDatabase interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}
func NewStreamMsgDatabase(db database.StreamMsg) StreamMsgDatabase {
return &streamMsgDatabase{db: db}
}
type streamMsgDatabase struct {
db database.StreamMsg
}
func (m *streamMsgDatabase) CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error {
return m.db.CreateStreamMsg(ctx, model)
}
func (m *streamMsgDatabase) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
return m.db.AppendStreamMsg(ctx, clientMsgID, startIndex, packets, end, deadlineTime)
}
func (m *streamMsgDatabase) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return m.db.GetStreamMsg(ctx, clientMsgID)
}

View File

@ -21,12 +21,11 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/db/tx"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
) )

View File

@ -24,6 +24,7 @@ import (
type Conversation interface { type Conversation interface {
Create(ctx context.Context, conversations []*model.Conversation) (err error) Create(ctx context.Context, conversations []*model.Conversation) (err error)
UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error)
UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error)
Update(ctx context.Context, conversation *model.Conversation) (err error) Update(ctx context.Context, conversation *model.Conversation) (err error)
Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error)
FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error)

View File

@ -21,23 +21,32 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
coll := db.Collection(database.ConversationName) coll := db.Collection(database.ConversationName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
{
Keys: bson.D{ Keys: bson.D{
{Key: "owner_user_id", Value: 1}, {Key: "owner_user_id", Value: 1},
{Key: "conversation_id", Value: 1}, {Key: "conversation_id", Value: 1},
}, },
Options: options.Index().SetUnique(true), Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "user_id", Value: 1},
},
Options: options.Index(),
},
}) })
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con
return rows, nil return rows, nil
} }
func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) {
if len(args) == 0 {
return nil, nil
}
filter := bson.M{
"user_id": userID,
}
conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1}))
if err != nil {
return nil, err
}
err = mongoutil.IncrVersion(func() error {
_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args})
if err != nil {
return err
}
return nil
}, func() error {
for _, conversation := range conversations {
if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return conversations, nil
}
func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) {
return mongoutil.IncrVersion(func() error { return mongoutil.IncrVersion(func() error {
return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)

View File

@ -1,60 +0,0 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/errs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
func NewStreamMsgMongo(db *mongo.Database) (*StreamMsgMongo, error) {
coll := db.Collection(database.StreamMsgName)
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "client_msg_id", Value: 1},
},
Options: options.Index().SetUnique(true),
})
if err != nil {
return nil, errs.Wrap(err)
}
return &StreamMsgMongo{coll: coll}, nil
}
type StreamMsgMongo struct {
coll *mongo.Collection
}
func (m *StreamMsgMongo) CreateStreamMsg(ctx context.Context, val *model.StreamMsg) error {
if val.Packets == nil {
val.Packets = []string{}
}
return mongoutil.InsertMany(ctx, m.coll, []*model.StreamMsg{val})
}
func (m *StreamMsgMongo) AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error {
update := bson.M{
"$set": bson.M{
"end": end,
"deadline_time": deadlineTime,
},
}
if len(packets) > 0 {
update["$push"] = bson.M{
"packets": bson.M{
"$each": packets,
"$position": startIndex,
},
}
}
return mongoutil.UpdateOne(ctx, m.coll, bson.M{"client_msg_id": clientMsgID, "end": false}, update, true)
}
func (m *StreamMsgMongo) GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
return mongoutil.FindOne[*model.StreamMsg](ctx, m.coll, bson.M{"client_msg_id": clientMsgID})
}

View File

@ -1,13 +0,0 @@
package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
)
type StreamMsg interface {
CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error
AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool, deadlineTime time.Time) error
GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error)
}

View File

@ -1,21 +0,0 @@
package model
import (
"time"
)
const (
StreamMsgStatusWait = 0
StreamMsgStatusDone = 1
StreamMsgStatusFail = 2
)
type StreamMsg struct {
ClientMsgID string `bson:"client_msg_id"`
ConversationID string `bson:"conversation_id"`
UserID string `bson:"user_id"`
Packets []string `bson:"packets"`
End bool `bson:"end"`
CreateTime time.Time `bson:"create_time"`
DeadlineTime time.Time `bson:"deadline_time"`
}

View File

@ -2,9 +2,11 @@ package rpcli
import ( import (
"context" "context"
"google.golang.org/grpc"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"google.golang.org/grpc"
) )
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {