From 8087f705c009d30311072c203371b3cba2869c5e Mon Sep 17 00:00:00 2001
From: icey-yu <119291641+icey-yu@users.noreply.github.com>
Date: Tue, 23 Jul 2024 10:26:04 +0800
Subject: [PATCH 1/2] Fix search log (#2425)
* feat: update alert email template
* fix: search log
* fix: log support ex
---
 config/email.tmpl                     | 44 +++++++++++++++++++--------
 internal/rpc/third/log.go             |  5 +--
 pkg/common/storage/controller/user.go |  1 +
 3 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/config/email.tmpl b/config/email.tmpl
index 0385601d0..824144e9d 100644
--- a/config/email.tmpl
+++ b/config/email.tmpl
@@ -1,16 +1,36 @@
 {{ define "email.to.html" }}
-{{ range .Alerts }}
-
-
-    
OpenIM Alert
-    
Alert Program: Prometheus Alert
-    
Severity Level: {{ .Labels.severity }}
-    
Alert Type: {{ .Labels.alertname }}
-    
Affected Host: {{ .Labels.instance }}
-    
Affected Service: {{ .Labels.job }}
-    
Alert Subject: {{ .Annotations.summary }}
-    
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
-
 
+{{ if eq .Status "firing" }}
+    {{ range .Alerts }}
+    
+    
+        
OpenIM Alert
+        
Alert Status: firing
+        
Alert Program: Prometheus Alert
+        
Severity Level: {{ .Labels.severity }}
+        
Alert Type: {{ .Labels.alertname }}
+        
Affected Host: {{ .Labels.instance }}
+        
Affected Service: {{ .Labels.job }}
+        
Alert Subject: {{ .Annotations.summary }}
+        
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+    
 
+    {{ end }}
+
+
+{{ else if eq .Status "resolved" }}
+    {{ range .Alerts }}
+    
+    
+        
OpenIM Alert
+        
Alert Status: resolved
+        
Alert Program: Prometheus Alert
+        
Severity Level: {{ .Labels.severity }}
+        
Alert Type: {{ .Labels.alertname }}
+        
Affected Host: {{ .Labels.instance }}
+        
Affected Service: {{ .Labels.job }}
+        
Alert Subject: {{ .Annotations.summary }}
+        
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+    
 
+    {{ end }}
 
 {{ end }}
 {{ end }}
diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go
index 5c0b1f2e6..cd52727cb 100644
--- a/internal/rpc/third/log.go
+++ b/internal/rpc/third/log.go
@@ -50,13 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
 	platform := constant.PlatformID2Name[int(req.Platform)]
 	for _, fileURL := range req.FileURLs {
 		log := relationtb.Log{
-			Version:    req.Version,
-			SystemType: req.SystemType,
 			Platform:   platform,
 			UserID:     userID,
 			CreateTime: time.Now(),
 			Url:        fileURL.URL,
 			FileName:   fileURL.Filename,
+			SystemType: req.SystemType,
+			Version:    req.Version,
+			Ex:         req.Ex,
 		}
 		for i := 0; i < 20; i++ {
 			id := genLogID()
diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go
index 5ce8104e7..533eac78f 100644
--- a/pkg/common/storage/controller/user.go
+++ b/pkg/common/storage/controller/user.go
@@ -110,6 +110,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error
 
 // FindWithError Get the information of the specified user and return an error if the userID is not found.
 func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*model.User, err error) {
+	userIDs = datautil.Distinct(userIDs)
 	users, err = u.cache.GetUsersInfo(ctx, userIDs)
 	if err != nil {
 		return
From 49ca5c998c96692bd61acdc9e9e69b24b6773994 Mon Sep 17 00:00:00 2001
From: chao <48119764+withchao@users.noreply.github.com>
Date: Tue, 23 Jul 2024 14:20:42 +0800
Subject: [PATCH 2/2] feat: msg queue push (#2434)
* fix: GroupApplicationAcceptedNotification
* fix: GroupApplicationAcceptedNotification
* fix: NotificationUserInfoUpdate
* cicd: robot automated Change
* fix: component
* fix: getConversationInfo
* feat: cron task
* feat: cron task
* feat: cron task
* feat: cron task
* feat: cron task
* fix: minio config url recognition error
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* friend incr sync
* friend incr sync
* friend incr sync
* friend incr sync
* friend incr sync
* mage
* optimization version log
* optimization version log
* sync
* sync
* sync
* group sync
* sync option
* sync option
* refactor: replace `friend` package with `realtion`.
* refactor: update lastest commit to relation.
* sync option
* sync option
* sync option
* sync
* sync
* go.mod
* seq
* update: go mod
* refactor: change incremental to full
* feat: get full friend user ids
* feat: api and config
* seq
* group version
* merge
* seq
* seq
* seq
* fix: sort by id avoid unstable sort friends.
* group
* group
* group
* fix: sort by id avoid unstable sort friends.
* fix: sort by id avoid unstable sort friends.
* fix: sort by id avoid unstable sort friends.
* user version
* seq
* seq
* seq user
* user online
* implement minio expire delete.
* user online
* config
* fix
* fix
* implement minio expire delete logic.
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* feat: implement scheduled delete outdated object in minio.
* update gomake version
* update gomake version
* implement FindExpires pagination.
* remove unnesseary incr.
* fix uncorrect args call.
* online push
* online push
* online push
* resolving conflicts
* resolving conflicts
* test
* api prommetrics
* api prommetrics
* api prommetrics
* api prommetrics
* api prommetrics
* rpc prommetrics
* rpc prommetrics
* online status
* online status
* online status
* online status
* sub
* conversation version incremental
* merge seq
* merge online
* merge online
* merge online
* merge seq
* GetOwnerConversation
* fix: change incremental syncer router name.
* rockscache batch get
* rockscache seq batch get
* fix: GetMsgDocModelByIndex bug
* update go.mod
* update go.mod
* merge
* feat: prometheus
* feat: prometheus
* group member sort
* sub
* sub
* fix: seq conversion bug
* fix: redis pipe exec
* sort version
* sort version
* sort version
* remove old version online subscription
* remove old version online subscription
* version log index
* version log index
* batch push
* batch push
---------
Co-authored-by: withchao 
Co-authored-by: Monet Lee 
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
---
 go.mod                            |   2 +-
 go.sum                            |   4 +-
 internal/msggateway/hub_server.go | 128 ++++++++++++++++++++----------
 3 files changed, 89 insertions(+), 45 deletions(-)
diff --git a/go.mod b/go.mod
index 0637492eb..fa40effdd 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 	github.com/mitchellh/mapstructure v1.5.0
 	github.com/openimsdk/protocol v0.0.69-alpha.38
-	github.com/openimsdk/tools v0.0.49-alpha.51
+	github.com/openimsdk/tools v0.0.49-alpha.52
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/prometheus/client_golang v1.18.0
 	github.com/stretchr/testify v1.9.0
diff --git a/go.sum b/go.sum
index 92a783c06..cc0f5f766 100644
--- a/go.sum
+++ b/go.sum
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
 github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
 github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
 github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
-github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
-github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
+github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0=
+github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
 github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
 github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go
index 3891aa532..28c227162 100644
--- a/internal/msggateway/hub_server.go
+++ b/internal/msggateway/hub_server.go
@@ -22,11 +22,15 @@ import (
 	"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
 	"github.com/openimsdk/protocol/constant"
 	"github.com/openimsdk/protocol/msggateway"
+	"github.com/openimsdk/protocol/sdkws"
 	"github.com/openimsdk/tools/discovery"
 	"github.com/openimsdk/tools/errs"
 	"github.com/openimsdk/tools/log"
 	"github.com/openimsdk/tools/mcontext"
+	"github.com/openimsdk/tools/mq/memamq"
+	"github.com/openimsdk/tools/utils/datautil"
 	"google.golang.org/grpc"
+	"sync/atomic"
 )
 
 func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -57,6 +61,7 @@ type Server struct {
 	pushTerminal   map[int]struct{}
 	ready          func(srv *Server) error
 	userRcp        rpcclient.UserRpcClient
+	queue          *memamq.MemoryQueue
 }
 
 func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
 		pushTerminal:   make(map[int]struct{}),
 		config:         conf,
 		ready:          ready,
+		queue:          memamq.NewMemoryQueue(512, 1024*16),
 	}
 	s.pushTerminal[constant.IOSPlatformID] = struct{}{}
 	s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
 	return nil, nil
 }
 
-func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
-) (*msggateway.OnlineBatchPushOneMsgResp, error) {
-	var singleUserResults []*msggateway.SingleMsgToUserResults
-	for _, v := range req.PushToUserIDs {
-		var resp []*msggateway.SingleMsgToUserPlatform
-		results := &msggateway.SingleMsgToUserResults{
-			UserID: v,
+func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
+	clients, ok := s.LongConnServer.GetUserAllCons(userID)
+	if !ok {
+		log.ZDebug(ctx, "push user not online", "userID", userID)
+		return &msggateway.SingleMsgToUserResults{
+			UserID: userID,
 		}
-		clients, ok := s.LongConnServer.GetUserAllCons(v)
-		if !ok {
-			log.ZDebug(ctx, "push user not online", "userID", v)
-			results.Resp = resp
-			singleUserResults = append(singleUserResults, results)
+	}
+	log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
+	result := &msggateway.SingleMsgToUserResults{
+		UserID: userID,
+		Resp:   make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
+	}
+	for _, client := range clients {
+		if client == nil {
 			continue
 		}
-
-		log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
-		for _, client := range clients {
-			if client == nil {
-				continue
-			}
-
-			userPlatform := &msggateway.SingleMsgToUserPlatform{
-				RecvPlatFormID: int32(client.PlatformID),
-			}
-			if !client.IsBackground ||
-				(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
-				err := client.PushMessage(ctx, req.MsgData)
-				if err != nil {
-					userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
-					resp = append(resp, userPlatform)
-				} else {
-					if _, ok := s.pushTerminal[client.PlatformID]; ok {
-						results.OnlinePush = true
-						resp = append(resp, userPlatform)
-					}
-				}
+		userPlatform := &msggateway.SingleMsgToUserPlatform{
+			RecvPlatFormID: int32(client.PlatformID),
+		}
+		if !client.IsBackground ||
+			(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
+			err := client.PushMessage(ctx, msgData)
+			if err != nil {
+				userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
 			} else {
-				userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
-				resp = append(resp, userPlatform)
+				if _, ok := s.pushTerminal[client.PlatformID]; ok {
+					result.OnlinePush = true
+				}
+			}
+		} else {
+			userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
+		}
+		result.Resp = append(result.Resp, userPlatform)
+	}
+	return result
+}
+
+func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
+	if len(req.PushToUserIDs) == 0 {
+		return &msggateway.OnlineBatchPushOneMsgResp{}, nil
+	}
+	ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
+	var count atomic.Int64
+	count.Add(int64(len(req.PushToUserIDs)))
+	for i := range req.PushToUserIDs {
+		userID := req.PushToUserIDs[i]
+		err := s.queue.PushCtx(ctx, func() {
+			ch <- s.pushToUser(ctx, userID, req.MsgData)
+			if count.Add(-1) == 0 {
+				close(ch)
+			}
+		})
+		if err != nil {
+			if count.Add(-1) == 0 {
+				close(ch)
+			}
+			log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
+			ch <- &msggateway.SingleMsgToUserResults{
+				UserID: userID,
 			}
 		}
-		results.Resp = resp
-		singleUserResults = append(singleUserResults, results)
 	}
-
-	return &msggateway.OnlineBatchPushOneMsgResp{
-		SinglePushResult: singleUserResults,
-	}, nil
+	resp := &msggateway.OnlineBatchPushOneMsgResp{
+		SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
+	}
+	for {
+		select {
+		case <-ctx.Done():
+			log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
+			userIDSet := datautil.SliceSet(req.PushToUserIDs)
+			for _, results := range resp.SinglePushResult {
+				delete(userIDSet, results.UserID)
+			}
+			for userID := range userIDSet {
+				resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
+					UserID: userID,
+				})
+			}
+			return resp, nil
+		case res, ok := <-ch:
+			if !ok {
+				return resp, nil
+			}
+			resp.SinglePushResult = append(resp.SinglePushResult, res)
+		}
+	}
 }
 
 func (s *Server) KickUserOffline(