mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-10-28 06:22:12 +08:00
Merge remote-tracking branch 'refs/remotes/upstream/main'
# Conflicts: # go.mod # go.sum
This commit is contained in:
commit
e3b808e9d4
@ -4,3 +4,4 @@ password: openIM123
|
|||||||
clusterMode: false
|
clusterMode: false
|
||||||
db: 0
|
db: 0
|
||||||
maxRetry: 10
|
maxRetry: 10
|
||||||
|
poolSize: 100
|
||||||
|
|||||||
8
go.sum
8
go.sum
@ -319,10 +319,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.11 h1:SlSG80C3Y0iOXlsbnh7ZqE9imMoBRy9i+9ebwsbSqfM=
|
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
|
||||||
github.com/openimsdk/protocol v0.0.72-alpha.11/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
|
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc=
|
||||||
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||||
|
|||||||
@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
||||||
|
for _, storageMsg := range storageList {
|
||||||
|
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
|
||||||
|
}
|
||||||
|
|
||||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||||
var storageMessageList []*sdkws.MsgData
|
var storageMessageList []*sdkws.MsgData
|
||||||
for _, msg := range storageList {
|
for _, msg := range storageList {
|
||||||
@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
|
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
|
||||||
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,7 @@ package dummy
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewClient() *Dummy {
|
func NewClient() *Dummy {
|
||||||
@ -27,5 +28,6 @@ type Dummy struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
|
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
|
||||||
|
log.ZInfo(ctx, "dummy push")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -93,7 +93,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
|||||||
nowSec := timeutil.GetCurrentTimestampBySecond()
|
nowSec := timeutil.GetCurrentTimestampBySecond()
|
||||||
|
|
||||||
if nowSec-sec > 10 {
|
if nowSec-sec > 10 {
|
||||||
log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
prommetrics.MsgLoneTimePushCounter.Inc()
|
||||||
|
log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
switch msgFromMQ.MsgData.SessionType {
|
switch msgFromMQ.MsgData.SessionType {
|
||||||
|
|||||||
@ -1028,12 +1028,12 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
|
|||||||
}
|
}
|
||||||
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID})
|
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "GetGroupMemberIDs", err)
|
log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
|
||||||
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
|
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
|
||||||
log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation)
|
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
|
g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
|
||||||
@ -1125,33 +1125,36 @@ func (g *groupServer) SetGroupInfoEX(ctx context.Context, req *pbgroup.SetGroupI
|
|||||||
if req.GroupInfoForSetEX.Notification != nil {
|
if req.GroupInfoForSetEX.Notification != nil {
|
||||||
num--
|
num--
|
||||||
|
|
||||||
func() {
|
if req.GroupInfoForSetEX.Notification.Value != "" {
|
||||||
conversation := &pbconversation.ConversationReq{
|
func() {
|
||||||
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID),
|
conversation := &pbconversation.ConversationReq{
|
||||||
ConversationType: constant.ReadGroupChatType,
|
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSetEX.GroupID),
|
||||||
GroupID: req.GroupInfoForSetEX.GroupID,
|
ConversationType: constant.ReadGroupChatType,
|
||||||
}
|
GroupID: req.GroupInfoForSetEX.GroupID,
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSetEX.GroupID})
|
resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSetEX.GroupID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "GetGroupMemberIDs", err)
|
log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}
|
||||||
|
|
||||||
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
|
if err := g.conversationRpcClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
|
||||||
log.ZWarn(ctx, "SetConversations", err, resp.UserIDs, conversation)
|
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
|
g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.GroupInfoForSetEX.GroupName != "" {
|
if req.GroupInfoForSetEX.GroupName != "" {
|
||||||
num--
|
num--
|
||||||
|
|
||||||
g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser})
|
g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser})
|
||||||
}
|
}
|
||||||
|
|
||||||
if num > 0 {
|
if num > 0 {
|
||||||
g.notification.GroupInfoSetNotification(ctx, tips)
|
g.notification.GroupInfoSetNotification(ctx, tips)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -312,16 +312,20 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel
|
|||||||
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
if err := s.userRpcClient.Access(ctx, req.UserID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination)
|
total, friendRequests, err := s.db.PageFriendRequestToMe(ctx, req.UserID, req.Pagination)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp = &relation.GetPaginationFriendsApplyToResp{}
|
resp = &relation.GetPaginationFriendsApplyToResp{}
|
||||||
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
|
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Total = int32(total)
|
resp.Total = int32(total)
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -129,10 +129,11 @@ func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
|
func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
|
||||||
err := log.InitFromConfig(
|
err := log.InitLoggerFromConfig(
|
||||||
|
|
||||||
cmdOpts.loggerPrefixName,
|
cmdOpts.loggerPrefixName,
|
||||||
r.processName,
|
r.processName,
|
||||||
|
"", "",
|
||||||
r.log.RemainLogLevel,
|
r.log.RemainLogLevel,
|
||||||
r.log.IsStdout,
|
r.log.IsStdout,
|
||||||
r.log.IsJson,
|
r.log.IsJson,
|
||||||
|
|||||||
@ -336,7 +336,8 @@ type Redis struct {
|
|||||||
Password string `mapstructure:"password"`
|
Password string `mapstructure:"password"`
|
||||||
ClusterMode bool `mapstructure:"clusterMode"`
|
ClusterMode bool `mapstructure:"clusterMode"`
|
||||||
DB int `mapstructure:"storage"`
|
DB int `mapstructure:"storage"`
|
||||||
MaxRetry int `mapstructure:"MaxRetry"`
|
MaxRetry int `mapstructure:"maxRetry"`
|
||||||
|
PoolSize int `mapstructure:"poolSize"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type BeforeConfig struct {
|
type BeforeConfig struct {
|
||||||
@ -474,6 +475,7 @@ func (r *Redis) Build() *redisutil.Config {
|
|||||||
Password: r.Password,
|
Password: r.Password,
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
MaxRetry: r.MaxRetry,
|
MaxRetry: r.MaxRetry,
|
||||||
|
PoolSize: r.PoolSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -23,4 +23,8 @@ var (
|
|||||||
Name: "msg_offline_push_failed_total",
|
Name: "msg_offline_push_failed_total",
|
||||||
Help: "The number of msg failed offline pushed",
|
Help: "The number of msg failed offline pushed",
|
||||||
})
|
})
|
||||||
|
MsgLoneTimePushCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "msg_long_time_push_total",
|
||||||
|
Help: "The number of messages with a push time exceeding 10 seconds",
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|||||||
@ -47,9 +47,17 @@ func GetGrpcCusMetrics(registerName string, share *config.Share) []prometheus.Co
|
|||||||
case share.RpcRegisterName.MessageGateway:
|
case share.RpcRegisterName.MessageGateway:
|
||||||
return []prometheus.Collector{OnlineUserGauge}
|
return []prometheus.Collector{OnlineUserGauge}
|
||||||
case share.RpcRegisterName.Msg:
|
case share.RpcRegisterName.Msg:
|
||||||
return []prometheus.Collector{SingleChatMsgProcessSuccessCounter, SingleChatMsgProcessFailedCounter, GroupChatMsgProcessSuccessCounter, GroupChatMsgProcessFailedCounter}
|
return []prometheus.Collector{
|
||||||
|
SingleChatMsgProcessSuccessCounter,
|
||||||
|
SingleChatMsgProcessFailedCounter,
|
||||||
|
GroupChatMsgProcessSuccessCounter,
|
||||||
|
GroupChatMsgProcessFailedCounter,
|
||||||
|
}
|
||||||
case share.RpcRegisterName.Push:
|
case share.RpcRegisterName.Push:
|
||||||
return []prometheus.Collector{MsgOfflinePushFailedCounter}
|
return []prometheus.Collector{
|
||||||
|
MsgOfflinePushFailedCounter,
|
||||||
|
MsgLoneTimePushCounter,
|
||||||
|
}
|
||||||
case share.RpcRegisterName.Auth:
|
case share.RpcRegisterName.Auth:
|
||||||
return []prometheus.Collector{UserLoginCounter}
|
return []prometheus.Collector{UserLoginCounter}
|
||||||
case share.RpcRegisterName.User:
|
case share.RpcRegisterName.User:
|
||||||
|
|||||||
@ -118,7 +118,7 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin
|
|||||||
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
v, err := rcClient.Fetch2(ctx, key, expire, func() (s string, err error) {
|
||||||
t, err = fn(ctx)
|
t, err = fn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "getCache query database failed", err, "key", key)
|
//log.ZError(ctx, "getCache query database failed", err, "key", key)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
bs, err := json.Marshal(t)
|
bs, err := json.Marshal(t)
|
||||||
|
|||||||
@ -86,7 +86,7 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val)
|
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val)
|
||||||
} else {
|
} else {
|
||||||
log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID)
|
log.ZWarn(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
var cache cacheProto[pbconversation.Conversation]
|
var cache cacheProto[pbconversation.Conversation]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user