mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-31 08:29:33 +08:00 
			
		
		
		
	* docs: add openim docs Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * docs: add openim images test Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: add openim flag api configpath env set Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim push logger Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> --------- Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
		
			
				
	
	
		
			205 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			205 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package main
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"log"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/OpenIMSDK/protocol/constant"
 | |
| 	"github.com/OpenIMSDK/protocol/msg"
 | |
| 	"github.com/OpenIMSDK/protocol/sdkws"
 | |
| 	"github.com/OpenIMSDK/tools/mw"
 | |
| 	"github.com/golang/protobuf/proto"
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 
 | |
| 	"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
 | |
| 	pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
 | |
| )
 | |
| 
 | |
| func main() {
 | |
| 
 | |
| 	var (
 | |
| 		topic       = "ws2ms_chat"      // v2版本配置文件kafka.topic.ws2ms_chat
 | |
| 		kafkaAddr   = "127.0.0.1:9092"  // v2版本配置文件kafka.topic.addr
 | |
| 		rpcAddr     = "127.0.0.1:10130" // v3版本配置文件rpcPort.openImMessagePort
 | |
| 		adminUserID = "openIM123456"    // v3版本管理员userID
 | |
| 		concurrency = 1                 // 并发数量
 | |
| 	)
 | |
| 
 | |
| 	getRpcConn := func() (*grpc.ClientConn, error) {
 | |
| 		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 | |
| 		defer cancel()
 | |
| 		return grpc.DialContext(ctx, rpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient())
 | |
| 	}
 | |
| 	conn, err := getRpcConn()
 | |
| 	if err != nil {
 | |
| 		log.Println("get rpc conn", err)
 | |
| 		return
 | |
| 	}
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	msgClient := msg.NewMsgClient(conn)
 | |
| 
 | |
| 	conf := sarama.NewConfig()
 | |
| 	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
 | |
| 
 | |
| 	consumer, err := sarama.NewConsumer([]string{kafkaAddr}, conf)
 | |
| 	if err != nil {
 | |
| 		log.Println("kafka consumer conn", err)
 | |
| 		return
 | |
| 	}
 | |
| 	partitions, err := consumer.Partitions(topic) // Get all partitions according to topic
 | |
| 	if err != nil {
 | |
| 		log.Println("kafka partitions", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if len(partitions) == 0 {
 | |
| 		log.Println("kafka partitions is empty")
 | |
| 		return
 | |
| 	}
 | |
| 	log.Println("kafka partitions", partitions)
 | |
| 
 | |
| 	msgCh := make(chan *pbmsg.MsgDataToMQ, concurrency*2)
 | |
| 
 | |
| 	var kfkWg sync.WaitGroup
 | |
| 
 | |
| 	distinct := make(map[string]struct{})
 | |
| 	var lock sync.Mutex
 | |
| 
 | |
| 	for _, partition := range partitions {
 | |
| 		kfkWg.Add(1)
 | |
| 		go func(partition int32) {
 | |
| 			defer kfkWg.Done()
 | |
| 			pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
 | |
| 			if err != nil {
 | |
| 				log.Printf("kafka Consume Partition %d failed %s\n", partition, err)
 | |
| 				return
 | |
| 			}
 | |
| 			defer pc.Close()
 | |
| 			ch := pc.Messages()
 | |
| 			for {
 | |
| 				select {
 | |
| 				case <-time.After(time.Second * 10): // 10s读取不到就关闭
 | |
| 					return
 | |
| 				case message, ok := <-ch:
 | |
| 					if !ok {
 | |
| 						return
 | |
| 					}
 | |
| 					msgFromMQV2 := pbmsg.MsgDataToMQ{}
 | |
| 					err := proto.Unmarshal(message.Value, &msgFromMQV2)
 | |
| 					if err != nil {
 | |
| 						log.Printf("kafka msg partition %d offset %d unmarshal failed %s\n", message.Partition, message.Offset, message.Value)
 | |
| 						continue
 | |
| 					}
 | |
| 					if msgFromMQV2.MsgData == nil || msgFromMQV2.OperationID == "" {
 | |
| 						continue
 | |
| 					}
 | |
| 					if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
 | |
| 						continue
 | |
| 					}
 | |
| 					lock.Lock()
 | |
| 					_, exist := distinct[msgFromMQV2.MsgData.ClientMsgID]
 | |
| 					if !exist {
 | |
| 						distinct[msgFromMQV2.MsgData.ClientMsgID] = struct{}{}
 | |
| 					}
 | |
| 					lock.Unlock()
 | |
| 					if exist {
 | |
| 						continue
 | |
| 					}
 | |
| 					msgCh <- &msgFromMQV2
 | |
| 				}
 | |
| 			}
 | |
| 		}(partition)
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		kfkWg.Wait()
 | |
| 		close(msgCh)
 | |
| 	}()
 | |
| 
 | |
| 	var msgWg sync.WaitGroup
 | |
| 
 | |
| 	var (
 | |
| 		success int64
 | |
| 		failed  int64
 | |
| 	)
 | |
| 	for i := 0; i < concurrency; i++ {
 | |
| 		msgWg.Add(1)
 | |
| 		go func() {
 | |
| 			defer msgWg.Done()
 | |
| 			for message := range msgCh {
 | |
| 				HandlerV2Msg(msgClient, adminUserID, message, &success, &failed)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	msgWg.Wait()
 | |
| 	log.Printf("total %d success %d failed %d\n", success+failed, success, failed)
 | |
| }
 | |
| 
 | |
| func HandlerV2Msg(msgClient msg.MsgClient, adminUserID string, msgFromMQV2 *pbmsg.MsgDataToMQ, success *int64, failed *int64) {
 | |
| 	msgData := &sdkws.MsgData{
 | |
| 		SendID:           msgFromMQV2.MsgData.SendID,
 | |
| 		RecvID:           msgFromMQV2.MsgData.RecvID,
 | |
| 		GroupID:          msgFromMQV2.MsgData.GroupID,
 | |
| 		ClientMsgID:      msgFromMQV2.MsgData.ClientMsgID,
 | |
| 		ServerMsgID:      msgFromMQV2.MsgData.ServerMsgID,
 | |
| 		SenderPlatformID: msgFromMQV2.MsgData.SenderPlatformID,
 | |
| 		SenderNickname:   msgFromMQV2.MsgData.SenderNickname,
 | |
| 		SenderFaceURL:    msgFromMQV2.MsgData.SenderFaceURL,
 | |
| 		SessionType:      msgFromMQV2.MsgData.SessionType,
 | |
| 		MsgFrom:          msgFromMQV2.MsgData.MsgFrom,
 | |
| 		ContentType:      msgFromMQV2.MsgData.ContentType,
 | |
| 		SendTime:         msgFromMQV2.MsgData.SendTime,
 | |
| 		CreateTime:       msgFromMQV2.MsgData.CreateTime,
 | |
| 		Status:           msgFromMQV2.MsgData.Status,
 | |
| 		IsRead:           false,
 | |
| 		Options:          msgFromMQV2.MsgData.Options,
 | |
| 		AtUserIDList:     msgFromMQV2.MsgData.AtUserIDList,
 | |
| 		AttachedInfo:     msgFromMQV2.MsgData.AttachedInfo,
 | |
| 		Ex:               msgFromMQV2.MsgData.Ex,
 | |
| 	}
 | |
| 
 | |
| 	if msgFromMQV2.MsgData.OfflinePushInfo != nil {
 | |
| 		msgData.OfflinePushInfo = &sdkws.OfflinePushInfo{
 | |
| 			Title:         msgFromMQV2.MsgData.OfflinePushInfo.Title,
 | |
| 			Desc:          msgFromMQV2.MsgData.OfflinePushInfo.Desc,
 | |
| 			Ex:            msgFromMQV2.MsgData.OfflinePushInfo.Ex,
 | |
| 			IOSPushSound:  msgFromMQV2.MsgData.OfflinePushInfo.IOSPushSound,
 | |
| 			IOSBadgeCount: msgFromMQV2.MsgData.OfflinePushInfo.IOSBadgeCount,
 | |
| 			SignalInfo:    "",
 | |
| 		}
 | |
| 	}
 | |
| 	switch msgData.ContentType {
 | |
| 	case constant.Text:
 | |
| 		data, err := json.Marshal(apistruct.TextElem{
 | |
| 			Content: string(msgFromMQV2.MsgData.Content),
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		msgData.Content = data
 | |
| 	default:
 | |
| 		msgData.Content = msgFromMQV2.MsgData.Content
 | |
| 	}
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 | |
| 	defer cancel()
 | |
| 	ctx = context.WithValue(context.Background(), constant.OperationID, msgFromMQV2.OperationID)
 | |
| 	ctx = context.WithValue(ctx, constant.OpUserID, adminUserID)
 | |
| 
 | |
| 	resp, err := msgClient.SendMsg(ctx, &msg.SendMsgReq{MsgData: msgData})
 | |
| 	if err != nil {
 | |
| 		atomic.AddInt64(failed, 1)
 | |
| 		log.Printf("send msg %+v failed %s\n", msgData, err)
 | |
| 		return
 | |
| 	}
 | |
| 	atomic.AddInt64(success, 1)
 | |
| 	log.Printf("send msg success %+v resp %+v\n", msgData, resp)
 | |
| }
 |