mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 03:13:15 +08:00 
			
		
		
		
	fix 细节
1、统一结构体方法 receiver,都用 pointer 2、使用 errors.Is 来做错误判断 3、修复单词拼写的错误
This commit is contained in:
		
							parent
							
								
									7110183892
								
							
						
					
					
						commit
						65dfe561a7
					
				@ -49,14 +49,14 @@ func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.Use
 | 
				
			|||||||
		userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdminUserID: imAdminUserID}
 | 
							userRpcClient: rpcclient.NewUserRpcClientByUser(userRpcClient), imAdminUserID: imAdminUserID}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MessageApi) SetOptions(options map[string]bool, value bool) {
 | 
					func (*MessageApi) SetOptions(options map[string]bool, value bool) {
 | 
				
			||||||
	datautil.SetSwitchFromOptions(options, constant.IsHistory, value)
 | 
						datautil.SetSwitchFromOptions(options, constant.IsHistory, value)
 | 
				
			||||||
	datautil.SetSwitchFromOptions(options, constant.IsPersistent, value)
 | 
						datautil.SetSwitchFromOptions(options, constant.IsPersistent, value)
 | 
				
			||||||
	datautil.SetSwitchFromOptions(options, constant.IsSenderSync, value)
 | 
						datautil.SetSwitchFromOptions(options, constant.IsSenderSync, value)
 | 
				
			||||||
	datautil.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
 | 
						datautil.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq {
 | 
					func (m *MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq {
 | 
				
			||||||
	var newContent string
 | 
						var newContent string
 | 
				
			||||||
	options := make(map[string]bool, 5)
 | 
						options := make(map[string]bool, 5)
 | 
				
			||||||
	switch params.ContentType {
 | 
						switch params.ContentType {
 | 
				
			||||||
@ -231,7 +231,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Set the status to successful if the message is sent.
 | 
						// Set the status to successful if the message is sent.
 | 
				
			||||||
	var status int = constant.MsgSendSuccessed
 | 
						var status = constant.MsgSendSuccessed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Attempt to update the message sending status in the system.
 | 
						// Attempt to update the message sending status in the system.
 | 
				
			||||||
	_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
 | 
						_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
 | 
				
			||||||
 | 
				
			|||||||
@ -16,6 +16,7 @@ package msgtransfer
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
 | 
				
			||||||
@ -137,7 +138,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
 | 
				
			|||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err := prommetrics.TransferInit(prometheusPort); err != nil && err != http.ErrServerClosed {
 | 
								if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) {
 | 
				
			||||||
				netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
 | 
									netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort)
 | 
				
			||||||
				netDone <- struct{}{}
 | 
									netDone <- struct{}{}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
				
			|||||||
@ -16,6 +16,7 @@ package msgtransfer
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
	"github.com/IBM/sarama"
 | 
						"github.com/IBM/sarama"
 | 
				
			||||||
	"github.com/go-redis/redis"
 | 
						"github.com/go-redis/redis"
 | 
				
			||||||
	"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
						"github.com/openimsdk/open-im-server/v3/pkg/common/config"
 | 
				
			||||||
@ -187,7 +188,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
 | 
				
			|||||||
	if len(storageMessageList) > 0 {
 | 
						if len(storageMessageList) > 0 {
 | 
				
			||||||
		msg := storageMessageList[0]
 | 
							msg := storageMessageList[0]
 | 
				
			||||||
		lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
 | 
							lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
 | 
				
			||||||
		if err != nil && errs.Unwrap(err) != redis.Nil {
 | 
							if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
 | 
				
			||||||
			log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
 | 
								log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
				
			|||||||
@ -91,13 +91,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
 | 
					func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
 | 
				
			||||||
func (OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
 | 
					func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
 | 
					func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
 | 
				
			||||||
	sess sarama.ConsumerGroupSession,
 | 
						sess sarama.ConsumerGroupSession,
 | 
				
			||||||
	claim sarama.ConsumerGroupClaim,
 | 
						claim sarama.ConsumerGroupClaim,
 | 
				
			||||||
) error { // a instance in the consumer group
 | 
					) error { // an instance in the consumer group
 | 
				
			||||||
	log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
 | 
						log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
 | 
				
			||||||
		claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
 | 
							claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
 | 
				
			||||||
	for msg := range claim.Messages() {
 | 
						for msg := range claim.Messages() {
 | 
				
			||||||
 | 
				
			|||||||
@ -19,20 +19,20 @@ type OnlinePusher interface {
 | 
				
			|||||||
		pushToUserIDs *[]string) []string
 | 
							pushToUserIDs *[]string) []string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type emptyOnlinePUsher struct{}
 | 
					type emptyOnlinePusher struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newEmptyOnlinePUsher() *emptyOnlinePUsher {
 | 
					func newEmptyOnlinePusher() *emptyOnlinePusher {
 | 
				
			||||||
	return &emptyOnlinePUsher{}
 | 
						return &emptyOnlinePusher{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (emptyOnlinePUsher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
 | 
					func (emptyOnlinePusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
 | 
				
			||||||
	pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
 | 
						pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
 | 
				
			||||||
	log.ZWarn(ctx, "emptyOnlinePUsher GetConnsAndOnlinePush", nil)
 | 
						log.ZWarn(ctx, "emptyOnlinePusher GetConnsAndOnlinePush", nil)
 | 
				
			||||||
	return nil, nil
 | 
						return nil, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
 | 
					func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData,
 | 
				
			||||||
	wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
 | 
						wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string {
 | 
				
			||||||
	log.ZWarn(ctx, "emptyOnlinePUsher GetOnlinePushFailedUserIDs", nil)
 | 
						log.ZWarn(ctx, "emptyOnlinePusher GetOnlinePushFailedUserIDs", nil)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -45,7 +45,7 @@ func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) Onli
 | 
				
			|||||||
	case "etcd":
 | 
						case "etcd":
 | 
				
			||||||
		return NewDefaultAllNode(disCov, config)
 | 
							return NewDefaultAllNode(disCov, config)
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		return newEmptyOnlinePUsher()
 | 
							return newEmptyOnlinePusher()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -154,17 +154,17 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
 | 
				
			|||||||
			return nil
 | 
								return nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	offlinePUshUserID := []string{msg.RecvID}
 | 
						offlinePushUserID := []string{msg.RecvID}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	//receiver offline push
 | 
						//receiver offline push
 | 
				
			||||||
	if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush,
 | 
						if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush,
 | 
				
			||||||
		offlinePUshUserID, msg, nil); err != nil {
 | 
							offlinePushUserID, msg, nil); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = c.offlinePushMsg(ctx, msg, offlinePUshUserID)
 | 
						err = c.offlinePushMsg(ctx, msg, offlinePushUserID)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePUshUserID", offlinePUshUserID, "msg", msg)
 | 
							log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePushUserID", offlinePushUserID, "msg", msg)
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -89,8 +89,8 @@ type CommonMsgDatabase interface {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// to mq
 | 
						// to mq
 | 
				
			||||||
	MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
 | 
						MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
 | 
				
			||||||
	MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
 | 
						MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
 | 
				
			||||||
	MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
 | 
						MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
 | 
						RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
 | 
				
			||||||
	RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
 | 
						RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
 | 
				
			||||||
 | 
				
			|||||||
@ -117,9 +117,9 @@ func (m *MsgMgo) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error) {
 | 
					func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error) {
 | 
				
			||||||
	indexs := make([]int64, 0, len(seqs))
 | 
						indexes := make([]int64, 0, len(seqs))
 | 
				
			||||||
	for _, seq := range seqs {
 | 
						for _, seq := range seqs {
 | 
				
			||||||
		indexs = append(indexs, m.model.GetMsgIndex(seq))
 | 
							indexes = append(indexes, m.model.GetMsgIndex(seq))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pipeline := mongo.Pipeline{
 | 
						pipeline := mongo.Pipeline{
 | 
				
			||||||
		bson.D{{Key: "$match", Value: bson.D{
 | 
							bson.D{{Key: "$match", Value: bson.D{
 | 
				
			||||||
@ -130,7 +130,7 @@ func (m *MsgMgo) getMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID strin
 | 
				
			|||||||
			{Key: "doc_id", Value: 1},
 | 
								{Key: "doc_id", Value: 1},
 | 
				
			||||||
			{Key: "msgs", Value: bson.D{
 | 
								{Key: "msgs", Value: bson.D{
 | 
				
			||||||
				{Key: "$map", Value: bson.D{
 | 
									{Key: "$map", Value: bson.D{
 | 
				
			||||||
					{Key: "input", Value: indexs},
 | 
										{Key: "input", Value: indexes},
 | 
				
			||||||
					{Key: "as", Value: "index"},
 | 
										{Key: "as", Value: "index"},
 | 
				
			||||||
					{Key: "in", Value: bson.D{
 | 
										{Key: "in", Value: bson.D{
 | 
				
			||||||
						{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
 | 
											{Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}},
 | 
				
			||||||
 | 
				
			|||||||
@ -92,15 +92,15 @@ type GroupCount struct {
 | 
				
			|||||||
	Count   int64  `bson:"count"`
 | 
						Count   int64  `bson:"count"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) TableName() string {
 | 
					func (*MsgDocModel) TableName() string {
 | 
				
			||||||
	return MsgTableName
 | 
						return MsgTableName
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) GetSingleGocMsgNum() int64 {
 | 
					func (*MsgDocModel) GetSingleGocMsgNum() int64 {
 | 
				
			||||||
	return singleGocMsgNum
 | 
						return singleGocMsgNum
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) GetSingleGocMsgNum5000() int64 {
 | 
					func (*MsgDocModel) GetSingleGocMsgNum5000() int64 {
 | 
				
			||||||
	return singleGocMsgNum5000
 | 
						return singleGocMsgNum5000
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -108,12 +108,12 @@ func (m *MsgDocModel) IsFull() bool {
 | 
				
			|||||||
	return m.Msg[len(m.Msg)-1].Msg != nil
 | 
						return m.Msg[len(m.Msg)-1].Msg != nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m MsgDocModel) GetDocID(conversationID string, seq int64) string {
 | 
					func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
 | 
				
			||||||
	seqSuffix := (seq - 1) / singleGocMsgNum
 | 
						seqSuffix := (seq - 1) / singleGocMsgNum
 | 
				
			||||||
	return m.indexGen(conversationID, seqSuffix)
 | 
						return m.indexGen(conversationID, seqSuffix)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
 | 
					func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
 | 
				
			||||||
	t := make(map[string][]int64)
 | 
						t := make(map[string][]int64)
 | 
				
			||||||
	for i := 0; i < len(seqs); i++ {
 | 
						for i := 0; i < len(seqs); i++ {
 | 
				
			||||||
		docID := m.GetDocID(conversationID, seqs[i])
 | 
							docID := m.GetDocID(conversationID, seqs[i])
 | 
				
			||||||
@ -127,15 +127,15 @@ func (m MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[st
 | 
				
			|||||||
	return t
 | 
						return t
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) GetMsgIndex(seq int64) int64 {
 | 
					func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
 | 
				
			||||||
	return (seq - 1) % singleGocMsgNum
 | 
						return (seq - 1) % singleGocMsgNum
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
 | 
					func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
 | 
				
			||||||
	return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
 | 
						return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
 | 
					func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
 | 
				
			||||||
	for _, v := range seqs {
 | 
						for _, v := range seqs {
 | 
				
			||||||
		msgModel := new(sdkws.MsgData)
 | 
							msgModel := new(sdkws.MsgData)
 | 
				
			||||||
		msgModel.Seq = v
 | 
							msgModel.Seq = v
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user