mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-11-05 21:02:11 +08:00
fix: discov update.
This commit is contained in:
parent
5cc371b6ca
commit
5823b6925b
@ -16,6 +16,8 @@ package msgtransfer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/tools/utils/idutil"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -34,7 +36,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils"
|
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -347,7 +348,7 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|||||||
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
|
||||||
for uniqueKey, v := range aggregationMsgs {
|
for uniqueKey, v := range aggregationMsgs {
|
||||||
if len(v) >= 0 {
|
if len(v) >= 0 {
|
||||||
hashCode := utils.GetHashCode(uniqueKey)
|
hashCode := stringutil.GetHashCode(uniqueKey)
|
||||||
channelID := hashCode % ChannelNum
|
channelID := hashCode % ChannelNum
|
||||||
newCtx := withAggregationCtx(ctx, v)
|
newCtx := withAggregationCtx(ctx, v)
|
||||||
log.ZDebug(
|
log.ZDebug(
|
||||||
@ -438,7 +439,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
|
|||||||
rwLock.Unlock()
|
rwLock.Unlock()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
|
ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator())
|
||||||
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
|
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
|
||||||
for i := 0; i < len(buffer)/split; i++ {
|
for i := 0; i < len(buffer)/split; i++ {
|
||||||
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
|
||||||
|
|||||||
@ -16,6 +16,7 @@ package push
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -23,7 +24,6 @@ import (
|
|||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||||
@ -65,7 +65,7 @@ func callbackOfflinePush(ctx context.Context, callback *config.Callback, userIDs
|
|||||||
}
|
}
|
||||||
|
|
||||||
func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error {
|
func callbackOnlinePush(ctx context.Context, callback *config.Callback, userIDs []string, msg *sdkws.MsgData) error {
|
||||||
if !callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
|
if !callback.CallbackOnlinePush.Enable || datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := callbackstruct.CallbackBeforePushReq{
|
req := callbackstruct.CallbackBeforePushReq{
|
||||||
|
|||||||
@ -16,6 +16,8 @@ package push
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"github.com/openimsdk/tools/utils/timeutil"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
@ -24,7 +26,6 @@ import (
|
|||||||
pbchat "github.com/openimsdk/protocol/msg"
|
pbchat "github.com/openimsdk/protocol/msg"
|
||||||
pbpush "github.com/openimsdk/protocol/push"
|
pbpush "github.com/openimsdk/protocol/push"
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/utils"
|
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
|||||||
ConversationID: msgFromMQ.ConversationID,
|
ConversationID: msgFromMQ.ConversationID,
|
||||||
}
|
}
|
||||||
sec := msgFromMQ.MsgData.SendTime / 1000
|
sec := msgFromMQ.MsgData.SendTime / 1000
|
||||||
nowSec := utils.GetCurrentTimestampBySecond()
|
nowSec := timeutil.GetCurrentTimestampBySecond()
|
||||||
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
|
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
|
||||||
if nowSec-sec > 10 {
|
if nowSec-sec > 10 {
|
||||||
return
|
return
|
||||||
@ -66,7 +67,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
|||||||
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||||
default:
|
default:
|
||||||
var pushUserIDList []string
|
var pushUserIDList []string
|
||||||
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
||||||
if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
|
if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
|
||||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -16,6 +16,8 @@ package push
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||||
@ -24,9 +26,7 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
pbpush "github.com/openimsdk/protocol/push"
|
pbpush "github.com/openimsdk/protocol/push"
|
||||||
"github.com/openimsdk/tools/discoveryregistry"
|
|
||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/utils"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -34,7 +34,7 @@ type pushServer struct {
|
|||||||
pusher *Pusher
|
pusher *Pusher
|
||||||
}
|
}
|
||||||
|
|
||||||
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||||
rdb, err := cache.NewRedis(ctx, &config.Redis)
|
rdb, err := cache.NewRedis(ctx, &config.Redis)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -80,7 +80,7 @@ func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (re
|
|||||||
err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
|
||||||
default:
|
default:
|
||||||
var pushUserIDList []string
|
var pushUserIDList []string
|
||||||
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
|
||||||
if !isSenderSync {
|
if !isSenderSync {
|
||||||
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
46
pkg/util/conversationutil/conversationutil.go
Normal file
46
pkg/util/conversationutil/conversationutil.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package conversationutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GenConversationIDForSingle(sendID, recvID string) string {
|
||||||
|
l := []string{sendID, recvID}
|
||||||
|
sort.Strings(l)
|
||||||
|
return "si_" + strings.Join(l, "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func GenConversationUniqueKeyForGroup(groupID string) string {
|
||||||
|
return groupID
|
||||||
|
}
|
||||||
|
|
||||||
|
func GenGroupConversationID(groupID string) string {
|
||||||
|
return "sg_" + groupID
|
||||||
|
}
|
||||||
|
|
||||||
|
func GenConversationUniqueKeyForSingle(sendID, recvID string) string {
|
||||||
|
l := []string{sendID, recvID}
|
||||||
|
sort.Strings(l)
|
||||||
|
return strings.Join(l, "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetNotificationConversationIDByConversationID(conversationID string) string {
|
||||||
|
l := strings.Split(conversationID, "_")
|
||||||
|
if len(l) > 1 {
|
||||||
|
l[0] = "n"
|
||||||
|
return strings.Join(l, "_")
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSelfNotificationConversationID(userID string) string {
|
||||||
|
return "n_" + userID + "_" + userID
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSeqsBeginEnd(seqs []int64) (int64, int64) {
|
||||||
|
if len(seqs) == 0 {
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
return seqs[0], seqs[len(seqs)-1]
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user