mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
commit
ea39ae5ce2
@ -20,7 +20,7 @@ import (
|
|||||||
func NewThird(discov discoveryregistry.SvcDiscoveryRegistry) *Third {
|
func NewThird(discov discoveryregistry.SvcDiscoveryRegistry) *Third {
|
||||||
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImThirdName)
|
conn, err := discov.GetConn(context.Background(), config.Config.RpcRegisterName.OpenImThirdName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
// panic(err)
|
||||||
}
|
}
|
||||||
return &Third{conn: conn, discov: discov}
|
return &Third{conn: conn, discov: discov}
|
||||||
}
|
}
|
||||||
|
@ -51,9 +51,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
|||||||
err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData)
|
err = c.pusher.Push2User(ctx, []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}, pbData.MsgData)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == errNoOfflinePusher {
|
||||||
|
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
|
||||||
|
} else {
|
||||||
log.ZError(ctx, "push failed", err, "msg", pbData.String())
|
log.ZError(ctx, "push failed", err, "msg", pbData.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||||
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
|
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -52,7 +53,11 @@ func (r *pushServer) PushMsg(ctx context.Context, pbData *pbPush.PushMsgReq) (re
|
|||||||
err = r.pusher.Push2User(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}, pbData.MsgData)
|
err = r.pusher.Push2User(ctx, []string{pbData.MsgData.RecvID, pbData.MsgData.SendID}, pbData.MsgData)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err != errNoOfflinePusher {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
} else {
|
||||||
|
log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &pbPush.PushMsgResp{}, nil
|
return &pbPush.PushMsgResp{}, nil
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
|
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm"
|
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui"
|
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui"
|
||||||
@ -35,6 +36,8 @@ type Pusher struct {
|
|||||||
successCount int
|
successCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
|
||||||
|
|
||||||
func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
|
func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
|
||||||
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher {
|
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher {
|
||||||
rpcclient.NewGroupClient(client)
|
rpcclient.NewGroupClient(client)
|
||||||
@ -286,7 +289,7 @@ func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts,
|
|||||||
|
|
||||||
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
|
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
|
||||||
if p.offlinePusher == nil {
|
if p.offlinePusher == nil {
|
||||||
err = errors.New("no offlinePusher is configured")
|
err = errNoOfflinePusher
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
type AtContent struct {
|
type AtContent struct {
|
||||||
|
@ -3,9 +3,41 @@ package msg
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) {
|
func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (resp *msg.MarkMsgsAsReadResp, err error) {
|
||||||
|
conversations, err := m.Conversation.GetConversationsByConversationID(ctx, []string{req.ConversationID})
|
||||||
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
var recvID string
|
||||||
|
if conversations[0].ConversationType == constant.SingleChatType || conversations[0].ConversationType == constant.NotificationChatType {
|
||||||
|
if req.UserID == conversations[0].OwnerUserID {
|
||||||
|
recvID = conversations[0].UserID
|
||||||
|
} else {
|
||||||
|
recvID = conversations[0].OwnerUserID
|
||||||
|
}
|
||||||
|
} else if conversations[0].ConversationType == constant.SuperGroupChatType {
|
||||||
|
recvID = conversations[0].GroupID
|
||||||
|
}
|
||||||
|
err = m.MsgDatabase.MarkSingleChatMsgsAsRead(ctx, req.ConversationID, req.UserID, req.Seqs)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = m.sendMarkAsReadNotification(ctx, req.ConversationID, req.UserID, recvID, req.Seqs); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return &msg.MarkMsgsAsReadResp{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sendID, recvID string, seqs []int64) error {
|
||||||
|
// tips := &sdkws.MarkAsReadTips{
|
||||||
|
// MarkAsReadUserID: sendID,
|
||||||
|
// ConversationID: conversationID,
|
||||||
|
// Seqs: seqs,
|
||||||
|
// }
|
||||||
|
// m.notificationSender.NotificationWithSesstionType(ctx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -34,6 +34,8 @@ type CommonMsgDatabase interface {
|
|||||||
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
||||||
// 撤回消息
|
// 撤回消息
|
||||||
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
|
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
|
||||||
|
// mark as read
|
||||||
|
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
|
||||||
// 刪除redis中消息缓存
|
// 刪除redis中消息缓存
|
||||||
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
|
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
|
||||||
// incrSeq然后批量插入缓存
|
// incrSeq然后批量插入缓存
|
||||||
@ -297,6 +299,10 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin
|
|||||||
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
|
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error {
|
||||||
|
return db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, conversationID, seqs)
|
||||||
|
}
|
||||||
|
|
||||||
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
|
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
|
||||||
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
|
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package relation
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/ormutil"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/ormutil"
|
||||||
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||||
@ -51,7 +52,7 @@ func (b *BlackGorm) FindOwnerBlacks(ctx context.Context, ownerUserID string, pag
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, utils.Wrap(err, "")
|
return nil, 0, utils.Wrap(err, "")
|
||||||
}
|
}
|
||||||
totalUint32, blacks, err := ormutil.GormPage[relation.BlackModel](b.db(ctx), pageNumber, showNumber)
|
totalUint32, blacks, err := ormutil.GormPage[relation.BlackModel](b.db(ctx).Where("owner_user_id = ?", ownerUserID), pageNumber, showNumber)
|
||||||
total = int64(totalUint32)
|
total = int64(totalUint32)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,7 @@ type MsgDocModelInterface interface {
|
|||||||
DeleteDocs(ctx context.Context, docIDs []string) error
|
DeleteDocs(ctx context.Context, docIDs []string) error
|
||||||
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error)
|
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error)
|
||||||
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
|
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
|
||||||
|
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (MsgDocModel) TableName() string {
|
func (MsgDocModel) TableName() string {
|
||||||
|
@ -5,9 +5,10 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||||
|
|
||||||
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||||
@ -278,3 +279,12 @@ func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool,
|
|||||||
}
|
}
|
||||||
return count > 0, nil
|
return count > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error {
|
||||||
|
indexs := make([]int64, 0, len(seqs))
|
||||||
|
for _, seq := range seqs {
|
||||||
|
indexs = append(indexs, m.model.GetMsgIndex(seq))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -181,11 +181,12 @@ message MsgData {
|
|||||||
int64 sendTime = 15;
|
int64 sendTime = 15;
|
||||||
int64 createTime = 16;
|
int64 createTime = 16;
|
||||||
int32 status = 17;
|
int32 status = 17;
|
||||||
map<string, bool> options = 18;
|
bool isRead = 18;
|
||||||
OfflinePushInfo offlinePushInfo = 19;
|
map<string, bool> options = 19;
|
||||||
repeated string atUserIDList = 20;
|
OfflinePushInfo offlinePushInfo = 20;
|
||||||
string attachedInfo = 21;
|
repeated string atUserIDList = 21;
|
||||||
string ex = 22;
|
string attachedInfo = 22;
|
||||||
|
string ex = 23;
|
||||||
}
|
}
|
||||||
message PushMessages{
|
message PushMessages{
|
||||||
map<string, PullMsgs> msgs = 1;
|
map<string, PullMsgs> msgs = 1;
|
||||||
@ -453,6 +454,12 @@ message DeleteMsgsTips {
|
|||||||
repeated int64 seqs = 3;
|
repeated int64 seqs = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message MarkAsReadTips {
|
||||||
|
string markAsReadUserID = 1;
|
||||||
|
string conversationID = 2;
|
||||||
|
repeated int64 seqs = 3;
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////signal//////////////
|
///////////////////signal//////////////
|
||||||
message SignalReq {
|
message SignalReq {
|
||||||
oneof payload {
|
oneof payload {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user