mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	test
This commit is contained in:
		
							parent
							
								
									3ecc1816be
								
							
						
					
					
						commit
						dc6b0b4dde
					
				@ -98,19 +98,24 @@ func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error
 | 
			
		||||
func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
 | 
			
		||||
func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
 | 
			
		||||
	claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
 | 
			
		||||
	log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
 | 
			
		||||
	select {
 | 
			
		||||
	case cmd := <-mc.cmdCh:
 | 
			
		||||
		if cmd.Cmd == OnlineTopicVacancy {
 | 
			
		||||
			for msg := range claim.Messages() {
 | 
			
		||||
				if GetOnlineTopicStatus() == OnlineTopicVacancy {
 | 
			
		||||
					log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
 | 
			
		||||
					mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
 | 
			
		||||
					sess.MarkMessage(msg, "")
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition())
 | 
			
		||||
	for msg := range claim.Messages() {
 | 
			
		||||
		log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "offline")
 | 
			
		||||
		mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
 | 
			
		||||
	}
 | 
			
		||||
	//select {
 | 
			
		||||
	//case cmd := <-mc.cmdCh:
 | 
			
		||||
	//	if cmd.Cmd == OnlineTopicVacancy {
 | 
			
		||||
	//		for msg := range claim.Messages() {
 | 
			
		||||
	//			if GetOnlineTopicStatus() == OnlineTopicVacancy {
 | 
			
		||||
	//				log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
 | 
			
		||||
	//				mc.msgHandle[msg.Topic](msg.Value, string(msg.Key))
 | 
			
		||||
	//				sess.MarkMessage(msg, "")
 | 
			
		||||
	//			}
 | 
			
		||||
	//		}
 | 
			
		||||
	//	}
 | 
			
		||||
	//
 | 
			
		||||
	//}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user