diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 21e8e3023..6e9b8787b 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -17,6 +17,7 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" + "google.golang.org/grpc" "strings" ) @@ -32,11 +33,16 @@ type AtContent struct { IsAtSelf bool `json:"isAtSelf"` } +var grpcCons []*grpc.ClientConn + func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) - grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + if len(grpcCons) == 0 { + log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ") + grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + } //Online push message log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { diff --git a/test/mongo/cmd/main.go b/test/mongo/cmd/main.go new file mode 100644 index 000000000..2a8d41523 --- /dev/null +++ b/test/mongo/cmd/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "Open_IM/pkg/common/config" + mongo2 "Open_IM/test/mongo" + "context" + "flag" + "fmt" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func init() { + uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority" + if config.Config.Mongo.DBUri != "" { + // example: mongodb://$user:$password@mongo1.mongo:27017,mongo2.mongo:27017,mongo3.mongo:27017/$DBDatabase/?replicaSet=rs0&readPreference=secondary&authSource=admin&maxPoolSize=$DBMaxPoolSize + uri = config.Config.Mongo.DBUri + } else { + if config.Config.Mongo.DBPassword != "" && config.Config.Mongo.DBUserName != "" { + uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d", config.Config.Mongo.DBUserName, config.Config.Mongo.DBPassword, config.Config.Mongo.DBAddress[0], + config.Config.Mongo.DBDatabase, config.Config.Mongo.DBMaxPoolSize) + } else { + uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d", + config.Config.Mongo.DBAddress[0], config.Config.Mongo.DBDatabase, + config.Config.Mongo.DBMaxPoolSize) + } + } + var err error + mongo2.Client, err = mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) + if err != nil { + panic(err) + } + err = mongo2.Client.Ping(context.TODO(), nil) + if err != nil { + panic(err) + } + fmt.Println("Connected to MongoDB!") +} + +func main() { + userID := flag.String("userID", "", "userID") + flag.Parse() + fmt.Println("userID:", *userID) + mongo2.GetUserAllChat(*userID) +} diff --git a/test/mongo/mongo_utils.go b/test/mongo/mongo_utils.go new file mode 100644 index 000000000..748cfc9fb --- /dev/null +++ b/test/mongo/mongo_utils.go @@ -0,0 +1,42 @@ +package mongo + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/db" + server_api_params "Open_IM/pkg/proto/sdk_ws" + "context" + "fmt" + "github.com/golang/protobuf/proto" + "go.mongodb.org/mongo-driver/mongo" + "gopkg.in/mgo.v2/bson" + "time" +) + +var ( + Client *mongo.Client +) + +func GetUserAllChat(uid string) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + collection := Client.Database(config.Config.Mongo.DBDatabase).Collection("msg") + var userChatList []db.UserChat + result, err := collection.Find(context.Background(), bson.M{"uid": bson.M{"$regex": uid}}) + if err != nil { + fmt.Println("find error", err.Error()) + return + } + if err := result.All(ctx, &userChatList); err != nil { + fmt.Println(err.Error()) + } + for _, userChat := range userChatList { + for _, msg := range userChat.Msg { + msgData := &server_api_params.MsgData{} + err := proto.Unmarshal(msg.Msg, msgData) + if err != nil { + fmt.Println(err.Error(), msg) + continue + } + fmt.Println("sendID: ", msgData.SendID, "recvID: ", msgData.RecvID, "seq:", msgData.Seq, "status:", msgData.Status) + } + } +}