mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-27 05:52:29 +08:00 
			
		
		
		
	* fix: add openim server deploy Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add openim im ctl Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add more deployment docs Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * feat: add openim save images and release Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * build: set IBM sarama and set k8s deployment helm Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * fix: add openim code fix Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> * fix: add openim code fix Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com> --------- Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com>
		
			
				
	
	
		
			89 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			89 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright © 2023 OpenIM. All rights reserved.
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package data_conversion
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	topic = "latestMsgToRedis"
 | |
| 	addr  = "127.0.0.1:9092"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	consumer sarama.Consumer
 | |
| 	producer sarama.SyncProducer
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	// Producer
 | |
| 	config := sarama.NewConfig()            // Instantiate a sarama Config
 | |
| 	config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully
 | |
| 	config.Producer.Return.Errors = true
 | |
| 	config.Producer.RequiredAcks = sarama.WaitForAll        // Set producer Message Reply level 0 1 all
 | |
| 	config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
 | |
| 
 | |
| 	client, err := sarama.NewSyncProducer([]string{addr}, config)
 | |
| 	if err != nil {
 | |
| 		fmt.Println("producer closed, err:", err)
 | |
| 	}
 | |
| 	producer = client
 | |
| 
 | |
| 	// Consumer
 | |
| 	consumerT, err := sarama.NewConsumer([]string{addr}, sarama.NewConfig())
 | |
| 	if err != nil {
 | |
| 		fmt.Printf("fail to start consumer, err:%v\n", err)
 | |
| 	}
 | |
| 	consumer = consumerT
 | |
| }
 | |
| 
 | |
| func SendMessage() {
 | |
| 	// construct a message
 | |
| 	msg := &sarama.ProducerMessage{}
 | |
| 	msg.Topic = topic
 | |
| 	msg.Value = sarama.StringEncoder("this is a test log")
 | |
| 
 | |
| 	// Send a message
 | |
| 	pid, offset, err := producer.SendMessage(msg)
 | |
| 	if err != nil {
 | |
| 		fmt.Println("send msg failed, err:", err)
 | |
| 	}
 | |
| 	fmt.Printf("pid:%v offset:%v\n", pid, offset)
 | |
| }
 | |
| 
 | |
| func GetMessage() {
 | |
| 	partitionList, err := consumer.Partitions(topic) // Get all partitions according to topic
 | |
| 	if err != nil {
 | |
| 		fmt.Printf("fail to get list of partition:err%v\n", err)
 | |
| 	}
 | |
| 	fmt.Println(partitionList)
 | |
| 	for partition := range partitionList { // iterate over all partitions
 | |
| 		// Create a corresponding partition consumer for each partition
 | |
| 		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
 | |
| 		if err != nil {
 | |
| 			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
 | |
| 		}
 | |
| 		// Asynchronously consume information from each partition
 | |
| 		go func(sarama.PartitionConsumer) {
 | |
| 			for msg := range pc.Messages() {
 | |
| 				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
 | |
| 			}
 | |
| 		}(pc)
 | |
| 	}
 | |
| }
 |