From e4f3e34249eace95af2290148e7e626c455b3b81 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Mon, 23 Oct 2023 21:26:35 +0800 Subject: [PATCH] fix: api send messages for notification conversation. (#1254) * fix: to start im or chat, ZooKeeper must be started first. * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: go mod update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: get all userID Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: msggateway add online status call Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * chore: network mode change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * feat: add api of get server time Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: remove go work sum Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: pull message add isRead field Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: check msg-transfer script Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: start don't kill old process Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: check component Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: pull message set isRead only message come from single. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: add ex field to update group info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * cicd: robot automated Change * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: message log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * fxi: component check output valid info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fxi: component check output valid info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: send message test log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * cicd: robot automated Change * test: remove info log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: api of send message add sendTime field. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: add callback for update user's info. * cicd: robot automated Change * fix: change callback command name. * cicd: robot automated Change * fix: single chat unread status change. * fix: single chat unread status change. * fix: single chat unread status change. * fix: user status change. * cicd: robot automated Change * fix: user status change. * fix: user status change. * fix: user status change. * cicd: robot automated Change * fix: ws close when user logout. * fix: remove repeat platform on online status. * cicd: robot automated Change * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. --------- Signed-off-by: Gordon <1432970085@qq.com> Signed-off-by: withchao <993506633@qq.com> Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: withchao <993506633@qq.com> Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com> Co-authored-by: FGadvancer --- .golangci.yml | 2 +- cmd/openim-api/main.go | 1 - internal/api/msg.go | 32 +++------ internal/api/route.go | 1 - internal/api/user.go | 5 -- internal/msgtransfer/init.go | 4 +- .../msgtransfer/online_history_msg_handler.go | 26 ++++--- .../online_msg_to_mongo_handler.go | 2 +- internal/push/push_handler.go | 1 + internal/push/push_to_client.go | 9 ++- internal/rpc/conversation/conversaion.go | 72 ++++++++++++------- internal/rpc/friend/friend.go | 3 +- internal/rpc/msg/as_read.go | 5 +- internal/tools/msg.go | 6 +- pkg/apistruct/manage.go | 2 +- pkg/rpcclient/conversation.go | 13 ++-- pkg/rpcclient/msg.go | 11 +-- 17 files changed, 104 insertions(+), 91 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 050025b6e..8785b72d8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -171,7 +171,7 @@ linters-settings: # exclude_godoc_examples: false funlen: lines: 150 - statements: 50 + statements: 80 gci: # put imports beginning with prefix after 3rd-party packages; # only support one prefix diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index f6db87353..d1f5cb3f8 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -32,7 +32,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" ) - func main() { apiCmd := cmd.NewApiCmd() apiCmd.AddPortFlag() diff --git a/internal/api/msg.go b/internal/api/msg.go index b9b10e98e..028d77ca4 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -57,6 +57,10 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg) var newContent string options := make(map[string]bool, 5) switch params.ContentType { + case constant.OANotification: + notification := sdkws.NotificationElem{} + notification.Detail = utils.StructToJsonString(params.Content) + newContent = utils.StructToJsonString(¬ification) case constant.Text: fallthrough case constant.Picture: @@ -69,10 +73,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg) fallthrough case constant.File: fallthrough - case constant.CustomNotTriggerConversation: - fallthrough - case constant.CustomOnlineOnly: - fallthrough default: newContent = utils.StructToJsonString(params.Content) } @@ -82,11 +82,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg) if params.NotOfflinePush { utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false) } - if params.ContentType == constant.CustomOnlineOnly { - m.SetOptions(options, false) - } else if params.ContentType == constant.CustomNotTriggerConversation { - utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false) - } pbData := msg.SendMsgReq{ MsgData: &sdkws.MsgData{ SendID: params.SendID, @@ -105,14 +100,6 @@ func (m MessageApi) newUserSendMsgReq(c *gin.Context, params *apistruct.SendMsg) OfflinePushInfo: params.OfflinePushInfo, }, } - //if params.ContentType == constant.OANotification { - // var tips sdkws.TipsComm - // tips.JsonDetail = utils.StructToJsonString(params.Content) - // pbData.MsgData.Content, err = proto.Marshal(&tips) - // if err != nil { - // log.ZError(c, "Marshal failed ", err, "tips", tips.String()) - // } - //} return &pbData } @@ -180,15 +167,13 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM data = apistruct.FileElem{} case constant.Custom: data = apistruct.CustomElem{} - case constant.Revoke: - data = apistruct.RevokeElem{} case constant.OANotification: data = apistruct.OANotificationElem{} req.SessionType = constant.NotificationChatType - case constant.CustomNotTriggerConversation: - data = apistruct.CustomElem{} - case constant.CustomOnlineOnly: - data = apistruct.CustomElem{} + if !authverify.IsManagerUserID(req.SendID) { + return nil, errs.ErrNoPermission. + Wrap("only app manager can as sender send OANotificationElem") + } default: return nil, errs.ErrArgs.WithDetail("not support err contentType") } @@ -212,7 +197,6 @@ func (m *MessageApi) SendMessage(c *gin.Context) { apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message")) return } - sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg) if err != nil { log.ZError(c, "decodeData failed", err) diff --git a/internal/api/route.go b/internal/api/route.go index 9a639a2de..d714270b4 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -83,7 +83,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) - userRouterGroup.POST("/unsubscribe_users_status", ParseToken, u.UnSubscriberStatus) userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) } diff --git a/internal/api/user.go b/internal/api/user.go index 0a5a12698..86b7c0b0b 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -190,11 +190,6 @@ func (u *UserApi) SubscriberStatus(c *gin.Context) { a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c) } -// UnSubscriberStatus Unsubscribe a user's presence. -func (u *UserApi) UnSubscriberStatus(c *gin.Context) { - a2r.Call(user.UserClient.SubscribeOrCancelUsersStatus, u.Client, c) -} - // GetUserStatus Get the online status of the user. func (u *UserApi) GetUserStatus(c *gin.Context) { a2r.Call(user.UserClient.GetUserStatus, u.Client, c) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 7babd9a07..e5066633a 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -16,12 +16,13 @@ package msgtransfer import ( "fmt" - "sync" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "github.com/OpenIMSDK/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -30,7 +31,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/openimsdk/open-im-server/v3/pkg/common/prome" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index e83939a4c..b4556634c 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -283,20 +283,30 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg( return } if isNewConversation { - if storageList[0].SessionType == constant.SuperGroupChatType { - log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID) + switch storageList[0].SessionType { + case constant.SuperGroupChatType: + log.ZInfo(ctx, "group chat first create conversation", "conversationID", + conversationID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID) if err != nil { - log.ZWarn(ctx, "get group member ids error", err, "conversationID", conversationID) + log.ZWarn(ctx, "get group member ids error", err, "conversationID", + conversationID) } else { - if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, storageList[0].GroupID, userIDs); err != nil { - log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID) + if err := och.conversationRpcClient.GroupChatFirstCreateConversation(ctx, + storageList[0].GroupID, userIDs); err != nil { + log.ZWarn(ctx, "single chat first create conversation error", err, + "conversationID", conversationID) } } - } else { - if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, storageList[0].SendID); err != nil { - log.ZWarn(ctx, "single chat first create conversation error", err, "conversationID", conversationID) + case constant.SingleChatType, constant.NotificationChatType: + if err := och.conversationRpcClient.SingleChatFirstCreateConversation(ctx, storageList[0].RecvID, + storageList[0].SendID, conversationID, storageList[0].SessionType); err != nil { + log.ZWarn(ctx, "single chat or notification first create conversation error", err, + "conversationID", conversationID, "sessionType", storageList[0].SessionType) } + default: + log.ZWarn(ctx, "unknown session type", nil, "sessionType", + storageList[0].SessionType) } } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 8099d39d7..bfea6c433 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -62,7 +62,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo( log.ZError(ctx, "msgFromMQ.MsgData is empty", nil, "cMsg", cMsg) return } - log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.MsgData) + log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) if err != nil { log.ZError( diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 91363282f..a1a9ff08e 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -58,6 +58,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } sec := msgFromMQ.MsgData.SendTime / 1000 nowSec := utils.GetCurrentTimestampBySecond() + log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec) if nowSec-sec > 10 { return } diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 66b003eaa..ba0d65b39 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -126,13 +126,12 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg } func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error { - var notificationElem struct { - Detail string `json:"detail,omitempty"` - } - if err := json.Unmarshal(bytes, ¬ificationElem); err != nil { + var notification sdkws.NotificationElem + if err := json.Unmarshal(bytes, ¬ification); err != nil { return err } - return json.Unmarshal([]byte(notificationElem.Detail), t) + + return json.Unmarshal([]byte(notification.Detail), t) } func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index e76f83008..0ea7d54be 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -17,8 +17,6 @@ package conversation import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "google.golang.org/grpc" "github.com/OpenIMSDK/protocol/constant" @@ -114,7 +112,10 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers return resp, nil } -func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) { +//nolint +func (c *conversationServer) SetConversations(ctx context.Context, + req *pbconversation.SetConversationsReq, +) (*pbconversation.SetConversationsResp, error) { if req.Conversation == nil { return nil, errs.ErrArgs.Wrap("conversation must not be nil") } @@ -124,14 +125,8 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return nil, err } if groupInfo.Status == constant.GroupStatusDismissed { - return nil, err + return nil, errs.ErrDismissedAlready.Wrap("group dismissed") } - // for _, userID := range req.UserIDs { - // if _, err := c.groupRpcClient.GetGroupMemberCache(ctx, req.Conversation.GroupID, userID); err != nil { - // log.ZError(ctx, "user not in group", err, "userID", userID, "groupID", req.Conversation.GroupID) - // return nil, err - // } - // } } var unequal int var conv tablerelation.ConversationModel @@ -205,7 +200,14 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return nil, err } for _, userID := range req.UserIDs { - c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID) + err := c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, userID, req.Conversation.UserID, + req.Conversation.IsPrivateChat.Value, req.Conversation.ConversationID) + if err != nil { + log.ZWarn(ctx, "send conversation set private notification failed", err, + "userID", userID, "conversationID", req.Conversation.ConversationID) + + continue + } } } if req.Conversation.BurnDuration != nil { @@ -235,24 +237,40 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req } // create conversation without notification for msg redis transfer. -func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbconversation.CreateSingleChatConversationsReq) (*pbconversation.CreateSingleChatConversationsResp, error) { - var conversation tablerelation.ConversationModel - conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, req.RecvID, req.SendID) - conversation.ConversationType = constant.SingleChatType - conversation.OwnerUserID = req.SendID - conversation.UserID = req.RecvID - err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation}) - if err != nil { - log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) +func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, + req *pbconversation.CreateSingleChatConversationsReq, +) (*pbconversation.CreateSingleChatConversationsResp, error) { + switch req.ConversationType { + case constant.SingleChatType: + var conversation tablerelation.ConversationModel + conversation.ConversationID = req.ConversationID + conversation.ConversationType = req.ConversationType + conversation.OwnerUserID = req.SendID + conversation.UserID = req.RecvID + err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation}) + if err != nil { + log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) + } + + conversation2 := conversation + conversation2.OwnerUserID = req.RecvID + conversation2.UserID = req.SendID + err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation2}) + if err != nil { + log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) + } + case constant.NotificationChatType: + var conversation tablerelation.ConversationModel + conversation.ConversationID = req.ConversationID + conversation.ConversationType = req.ConversationType + conversation.OwnerUserID = req.RecvID + conversation.UserID = req.SendID + err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation}) + if err != nil { + log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) + } } - conversation2 := conversation - conversation2.OwnerUserID = req.RecvID - conversation2.UserID = req.SendID - err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation2}) - if err != nil { - log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) - } return &pbconversation.CreateSingleChatConversationsResp{}, nil } diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 1524a7f27..c563f77fe 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -252,7 +252,8 @@ func (s *friendServer) GetDesignatedFriends( return resp, nil } -func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) { +func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, + req *pbfriend.GetDesignatedFriendsApplyReq) (resp *pbfriend.GetDesignatedFriendsApplyResp, err error) { friendRequests, err := s.friendDatabase.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID) if err != nil { return nil, err diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index efa322774..55c3ba088 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -147,7 +147,6 @@ func (m *msgServer) MarkConversationAsRead( for i := hasReadSeq + 1; i <= req.HasReadSeq; i++ { seqs = append(seqs, i) } - if len(seqs) > 0 { log.ZDebug(ctx, "MarkConversationAsRead", "seqs", seqs, "conversationID", req.ConversationID) if err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.UserID, req.ConversationID, seqs); err != nil { @@ -165,7 +164,9 @@ func (m *msgServer) MarkConversationAsRead( m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil { return nil, err } - } else if conversation.ConversationType == constant.SuperGroupChatType { + + } else if conversation.ConversationType == constant.SuperGroupChatType || + conversation.ConversationType == constant.NotificationChatType { if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) if err != nil { diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 94ce2dec0..ca095051c 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -23,6 +23,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" @@ -35,7 +37,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" ) @@ -77,7 +78,8 @@ func InitMsgTool() (*MsgTool, error) { /* discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username, - config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/if err != nil { + config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/ + if err != nil { return nil, err } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index 7d30d2151..1238b4757 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -41,7 +41,7 @@ type SendMsgReq struct { type BatchSendMsgReq struct { SendMsg IsSendAll bool `json:"isSendAll"` - RecvIDs []string `json:"recvIDs"` + RecvIDs []string `json:"recvIDs" binding:"required"` } type BatchSendMsgResp struct { diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 30b0b4b77..df01bcb8f 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -39,7 +39,6 @@ func NewConversation(discov discoveryregistry.SvcDiscoveryRegistry) *Conversatio panic(err) } client := pbconversation.NewConversationClient(conn) - return &Conversation{discov: discov, conn: conn, Client: client} } @@ -57,13 +56,17 @@ func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Cont if err != nil { return 0, err } - return conversation.GetConversation().RecvMsgOpt, err } -func (c *ConversationRpcClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID string) error { - _, err := c.Client.CreateSingleChatConversations(ctx, &pbconversation.CreateSingleChatConversationsReq{RecvID: recvID, SendID: sendID}) - +func (c *ConversationRpcClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID, + conversationID string, conversationType int32, +) error { + _, err := c.Client.CreateSingleChatConversations(ctx, + &pbconversation.CreateSingleChatConversationsReq{ + RecvID: recvID, SendID: sendID, ConversationID: conversationID, + ConversationType: conversationType, + }) return err } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 51e29c7d8..00b0fa3f1 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -221,12 +221,13 @@ func WithRpcGetUserName() NotificationOptions { } } -func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { +func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, sendID, recvID string, + contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { n := sdkws.NotificationElem{Detail: utils.StructToJsonString(m)} content, err := json.Marshal(&n) if err != nil { - log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", sendID, "recvID", recvID, "contentType", contentType, "msg", m) - + log.ZError(ctx, "MsgClient Notification json.Marshal failed", err, "sendID", + sendID, "recvID", recvID, "contentType", contentType, "msg", m) return err } notificationOpt := ¬ificationOpt{} @@ -235,8 +236,8 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s } var req msg.SendMsgReq var msg sdkws.MsgData + var userInfo *sdkws.UserInfo if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil { - var userInfo *sdkws.UserInfo userInfo, err = s.getUserInfo(ctx, sendID) if err != nil { log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID) @@ -259,7 +260,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s msg.CreateTime = utils.GetCurrentTimestampByMill() msg.ClientMsgID = utils.GetMsgID(sendID) optionsConfig := s.contentTypeConf[contentType] - if sesstionType == constant.SuperGroupChatType && contentType == constant.HasReadReceipt { + if sendID == recvID && contentType == constant.HasReadReceipt { optionsConfig.ReliabilityLevel = constant.UnreliableNotification } options := config.GetOptionsByNotification(optionsConfig)