mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-01 00:42:13 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			35 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			35 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package kafka
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/openimsdk/protocol/constant"
 | |
| 	"github.com/openimsdk/tools/mcontext"
 | |
| )
 | |
| 
 | |
| var errEmptyMsg = errors.New("kafka binary msg is empty")
 | |
| 
 | |
| // GetMQHeaderWithContext extracts message queue headers from the context.
 | |
| func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
 | |
| 	operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return []sarama.RecordHeader{
 | |
| 		{Key: []byte(constant.OperationID), Value: []byte(operationID)},
 | |
| 		{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
 | |
| 		{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
 | |
| 		{Key: []byte(constant.ConnID), Value: []byte(connID)},
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // GetContextWithMQHeader creates a context from message queue headers.
 | |
| func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
 | |
| 	var values []string
 | |
| 	for _, recordHeader := range header {
 | |
| 		values = append(values, string(recordHeader.Value))
 | |
| 	}
 | |
| 	return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
 | |
| }
 |