mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 05:02:11 +08:00 
			
		
		
		
	feat: add backup volume && optimize log print (#3066)
* feat: backup * feat: backup * feat: backup * feat: optimize log print
This commit is contained in:
		
							parent
							
								
									5312089069
								
							
						
					
					
						commit
						6d743330b0
					
				
							
								
								
									
										2
									
								
								.env
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								.env
									
									
									
									
									
								
							| @ -17,6 +17,8 @@ OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.8.4 | ||||
| 
 | ||||
| DATA_DIR=./ | ||||
| 
 | ||||
| MONGO_BACKUP_DIR=${DATA_DIR}components/backup/mongo/ | ||||
| 
 | ||||
| PROMETHEUS_PORT=19091 | ||||
| ALERTMANAGER_PORT=19093 | ||||
| GRAFANA_PORT=13000 | ||||
|  | ||||
| @ -37,6 +37,7 @@ services: | ||||
|       - "${DATA_DIR}/components/mongodb/data/db:/data/db" | ||||
|       - "${DATA_DIR}/components/mongodb/data/logs:/data/logs" | ||||
|       - "${DATA_DIR}/components/mongodb/data/conf:/etc/mongo" | ||||
|       - "${MONGO_BACKUP_DIR}:/data/backup" | ||||
|     environment: | ||||
|       - TZ=Asia/Shanghai | ||||
|       - wiredTigerCacheSizeGB=1 | ||||
|  | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -13,7 +13,7 @@ require ( | ||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||
| 	github.com/mitchellh/mapstructure v1.5.0 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.71 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.67 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.70 | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_golang v1.18.0 | ||||
| 	github.com/stretchr/testify v1.9.0 | ||||
|  | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF | ||||
| github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.67 h1:K7kguqvPbjldHAi7pGhcG2ERkctCqG9ZFlteT7UKaxM= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.67/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.70 h1:pyqWkJzXbELWU9KKAsWkj3g0flJYNsDTcjR5SLFQAZU= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.70/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= | ||||
| github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= | ||||
|  | ||||
| @ -3,11 +3,12 @@ package push | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 	"math/rand" | ||||
| 	"strconv" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 
 | ||||
| 	"github.com/IBM/sarama" | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" | ||||
| @ -152,24 +153,24 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg * | ||||
| 	log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) | ||||
| 	defer func(duration time.Time) { | ||||
| 		t := time.Since(duration) | ||||
| 		log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t) | ||||
| 		log.ZInfo(ctx, "Get msg from msg_transfer And push msg end", "msg", msg.String(), "time cost", t) | ||||
| 	}(time.Now()) | ||||
| 	if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	log.ZInfo(ctx, "webhookBeforeOnlinePush end") | ||||
| 
 | ||||
| 	wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs) | ||||
| 	log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs) | ||||
| 	log.ZInfo(ctx, "single and notification push end") | ||||
| 
 | ||||
| 	if !c.shouldPushOffline(ctx, msg) { | ||||
| 		return nil | ||||
| 	} | ||||
| 	log.ZInfo(ctx, "shouldPushOffline end") | ||||
| 	log.ZInfo(ctx, "pushOffline start") | ||||
| 
 | ||||
| 	for _, v := range wsResults { | ||||
| 		//message sender do not need offline push | ||||
| @ -188,14 +189,14 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg * | ||||
| 	if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserID, msg, &offlinePushUserID); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	log.ZInfo(ctx, "webhookBeforeOfflinePush end") | ||||
| 
 | ||||
| 	if len(offlinePushUserID) > 0 { | ||||
| 		needOfflinePushUserID = offlinePushUserID | ||||
| 	} | ||||
| 	err = c.offlinePushMsg(ctx, msg, needOfflinePushUserID) | ||||
| 	if err != nil { | ||||
| 		log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg) | ||||
| 		log.ZDebug(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg) | ||||
| 		log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID length", len(needOfflinePushUserID), "msg", msg) | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| @ -250,26 +251,24 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s | ||||
| 		&pushToUserIDs); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end") | ||||
| 
 | ||||
| 	err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	log.ZInfo(ctx, "groupMessagesHandler end") | ||||
| 
 | ||||
| 	wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg) | ||||
| 	log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg) | ||||
| 	log.ZInfo(ctx, "online group push end") | ||||
| 
 | ||||
| 	if !c.shouldPushOffline(ctx, msg) { | ||||
| 		return nil | ||||
| 	} | ||||
| 	needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs) | ||||
| 	log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end") | ||||
| 	//filter some user, like don not disturb or don't need offline push etc. | ||||
| 	needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs) | ||||
| 	if err != nil { | ||||
| @ -297,9 +296,11 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU | ||||
| 		needOfflinePushUserIDs = offlinePushUserIDs | ||||
| 	} | ||||
| 	if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil { | ||||
| 		log.ZError(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs", | ||||
| 		log.ZDebug(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs", | ||||
| 			needOfflinePushUserIDs, "msg", msg) | ||||
| 		prommetrics.SingleChatMsgProcessFailedCounter.Inc() | ||||
| 		log.ZWarn(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs length", | ||||
| 			len(needOfflinePushUserIDs), "msg", msg) | ||||
| 		prommetrics.GroupChatMsgProcessFailedCounter.Inc() | ||||
| 		return | ||||
| 	} | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user