mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-13 16:47:36 +08:00
tools for conversion
This commit is contained in:
parent
05303eb7c0
commit
37d9b18fb6
@ -17,7 +17,6 @@ package data_conversion
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
|
||||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||||
"github.com/OpenIMSDK/protocol/sdkws"
|
"github.com/OpenIMSDK/protocol/sdkws"
|
||||||
openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
|
||||||
@ -37,6 +36,13 @@ var (
|
|||||||
//addr = "43.128.72.19:9092"
|
//addr = "43.128.72.19:9092"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ZkAddr = "127.0.0.1:2181"
|
||||||
|
ZKSchema = "openim"
|
||||||
|
ZKUsername = ""
|
||||||
|
ZKPassword = ""
|
||||||
|
)
|
||||||
|
|
||||||
var consumer sarama.Consumer
|
var consumer sarama.Consumer
|
||||||
var producer sarama.SyncProducer
|
var producer sarama.SyncProducer
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -96,7 +102,13 @@ func GetMessage() {
|
|||||||
go func(sarama.PartitionConsumer) {
|
go func(sarama.PartitionConsumer) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for msg := range pc.Messages() {
|
for msg := range pc.Messages() {
|
||||||
Transfer([]*sarama.ConsumerMessage{msg})
|
//Transfer([]*sarama.ConsumerMessage{msg})
|
||||||
|
msgFromMQ := &sdkws.MsgData{}
|
||||||
|
err := proto.Unmarshal(msg.Value, msgFromMQ)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("err:%s", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("msg:%s", msgFromMQ)
|
||||||
//fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
|
//fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
|
||||||
}
|
}
|
||||||
}(pc)
|
}(pc)
|
||||||
@ -138,17 +150,10 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
ZkAddr = "127.0.0.1:2181"
|
|
||||||
ZKSchema = "openim"
|
|
||||||
ZKUsername = ""
|
|
||||||
ZKPassword = ""
|
|
||||||
)
|
|
||||||
|
|
||||||
func GetMsgRpcService() (rpcclient.MessageRpcClient, error) {
|
func GetMsgRpcService() (rpcclient.MessageRpcClient, error) {
|
||||||
client, err := openKeeper.NewClient([]string{ZkAddr}, ZKSchema,
|
client, err := openKeeper.NewClient([]string{ZkAddr}, ZKSchema,
|
||||||
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKPassword,
|
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKUsername,
|
||||||
config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
|
ZKPassword), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
|
||||||
msgClient := rpcclient.NewMessageRpcClient(client)
|
msgClient := rpcclient.NewMessageRpcClient(client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return msgClient, errs.Wrap(err)
|
return msgClient, errs.Wrap(err)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user