mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-11-04 11:22:10 +08:00 
			
		
		
		
	* docs: add openim docs Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * docs: add openim images test Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: add openim flag api configpath env set Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim push logger Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> --------- Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
		
			
				
	
	
		
			205 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			205 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package main
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"log"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/IBM/sarama"
 | 
						|
	"github.com/OpenIMSDK/protocol/constant"
 | 
						|
	"github.com/OpenIMSDK/protocol/msg"
 | 
						|
	"github.com/OpenIMSDK/protocol/sdkws"
 | 
						|
	"github.com/OpenIMSDK/tools/mw"
 | 
						|
	"github.com/golang/protobuf/proto"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/credentials/insecure"
 | 
						|
 | 
						|
	"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
 | 
						|
	pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
 | 
						|
)
 | 
						|
 | 
						|
func main() {
 | 
						|
 | 
						|
	var (
 | 
						|
		topic       = "ws2ms_chat"      // v2版本配置文件kafka.topic.ws2ms_chat
 | 
						|
		kafkaAddr   = "127.0.0.1:9092"  // v2版本配置文件kafka.topic.addr
 | 
						|
		rpcAddr     = "127.0.0.1:10130" // v3版本配置文件rpcPort.openImMessagePort
 | 
						|
		adminUserID = "openIM123456"    // v3版本管理员userID
 | 
						|
		concurrency = 1                 // 并发数量
 | 
						|
	)
 | 
						|
 | 
						|
	getRpcConn := func() (*grpc.ClientConn, error) {
 | 
						|
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 | 
						|
		defer cancel()
 | 
						|
		return grpc.DialContext(ctx, rpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient())
 | 
						|
	}
 | 
						|
	conn, err := getRpcConn()
 | 
						|
	if err != nil {
 | 
						|
		log.Println("get rpc conn", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer conn.Close()
 | 
						|
 | 
						|
	msgClient := msg.NewMsgClient(conn)
 | 
						|
 | 
						|
	conf := sarama.NewConfig()
 | 
						|
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
 | 
						|
 | 
						|
	consumer, err := sarama.NewConsumer([]string{kafkaAddr}, conf)
 | 
						|
	if err != nil {
 | 
						|
		log.Println("kafka consumer conn", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	partitions, err := consumer.Partitions(topic) // Get all partitions according to topic
 | 
						|
	if err != nil {
 | 
						|
		log.Println("kafka partitions", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if len(partitions) == 0 {
 | 
						|
		log.Println("kafka partitions is empty")
 | 
						|
		return
 | 
						|
	}
 | 
						|
	log.Println("kafka partitions", partitions)
 | 
						|
 | 
						|
	msgCh := make(chan *pbmsg.MsgDataToMQ, concurrency*2)
 | 
						|
 | 
						|
	var kfkWg sync.WaitGroup
 | 
						|
 | 
						|
	distinct := make(map[string]struct{})
 | 
						|
	var lock sync.Mutex
 | 
						|
 | 
						|
	for _, partition := range partitions {
 | 
						|
		kfkWg.Add(1)
 | 
						|
		go func(partition int32) {
 | 
						|
			defer kfkWg.Done()
 | 
						|
			pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
 | 
						|
			if err != nil {
 | 
						|
				log.Printf("kafka Consume Partition %d failed %s\n", partition, err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			defer pc.Close()
 | 
						|
			ch := pc.Messages()
 | 
						|
			for {
 | 
						|
				select {
 | 
						|
				case <-time.After(time.Second * 10): // 10s读取不到就关闭
 | 
						|
					return
 | 
						|
				case message, ok := <-ch:
 | 
						|
					if !ok {
 | 
						|
						return
 | 
						|
					}
 | 
						|
					msgFromMQV2 := pbmsg.MsgDataToMQ{}
 | 
						|
					err := proto.Unmarshal(message.Value, &msgFromMQV2)
 | 
						|
					if err != nil {
 | 
						|
						log.Printf("kafka msg partition %d offset %d unmarshal failed %s\n", message.Partition, message.Offset, message.Value)
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if msgFromMQV2.MsgData == nil || msgFromMQV2.OperationID == "" {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					lock.Lock()
 | 
						|
					_, exist := distinct[msgFromMQV2.MsgData.ClientMsgID]
 | 
						|
					if !exist {
 | 
						|
						distinct[msgFromMQV2.MsgData.ClientMsgID] = struct{}{}
 | 
						|
					}
 | 
						|
					lock.Unlock()
 | 
						|
					if exist {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					msgCh <- &msgFromMQV2
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}(partition)
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		kfkWg.Wait()
 | 
						|
		close(msgCh)
 | 
						|
	}()
 | 
						|
 | 
						|
	var msgWg sync.WaitGroup
 | 
						|
 | 
						|
	var (
 | 
						|
		success int64
 | 
						|
		failed  int64
 | 
						|
	)
 | 
						|
	for i := 0; i < concurrency; i++ {
 | 
						|
		msgWg.Add(1)
 | 
						|
		go func() {
 | 
						|
			defer msgWg.Done()
 | 
						|
			for message := range msgCh {
 | 
						|
				HandlerV2Msg(msgClient, adminUserID, message, &success, &failed)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
 | 
						|
	msgWg.Wait()
 | 
						|
	log.Printf("total %d success %d failed %d\n", success+failed, success, failed)
 | 
						|
}
 | 
						|
 | 
						|
func HandlerV2Msg(msgClient msg.MsgClient, adminUserID string, msgFromMQV2 *pbmsg.MsgDataToMQ, success *int64, failed *int64) {
 | 
						|
	msgData := &sdkws.MsgData{
 | 
						|
		SendID:           msgFromMQV2.MsgData.SendID,
 | 
						|
		RecvID:           msgFromMQV2.MsgData.RecvID,
 | 
						|
		GroupID:          msgFromMQV2.MsgData.GroupID,
 | 
						|
		ClientMsgID:      msgFromMQV2.MsgData.ClientMsgID,
 | 
						|
		ServerMsgID:      msgFromMQV2.MsgData.ServerMsgID,
 | 
						|
		SenderPlatformID: msgFromMQV2.MsgData.SenderPlatformID,
 | 
						|
		SenderNickname:   msgFromMQV2.MsgData.SenderNickname,
 | 
						|
		SenderFaceURL:    msgFromMQV2.MsgData.SenderFaceURL,
 | 
						|
		SessionType:      msgFromMQV2.MsgData.SessionType,
 | 
						|
		MsgFrom:          msgFromMQV2.MsgData.MsgFrom,
 | 
						|
		ContentType:      msgFromMQV2.MsgData.ContentType,
 | 
						|
		SendTime:         msgFromMQV2.MsgData.SendTime,
 | 
						|
		CreateTime:       msgFromMQV2.MsgData.CreateTime,
 | 
						|
		Status:           msgFromMQV2.MsgData.Status,
 | 
						|
		IsRead:           false,
 | 
						|
		Options:          msgFromMQV2.MsgData.Options,
 | 
						|
		AtUserIDList:     msgFromMQV2.MsgData.AtUserIDList,
 | 
						|
		AttachedInfo:     msgFromMQV2.MsgData.AttachedInfo,
 | 
						|
		Ex:               msgFromMQV2.MsgData.Ex,
 | 
						|
	}
 | 
						|
 | 
						|
	if msgFromMQV2.MsgData.OfflinePushInfo != nil {
 | 
						|
		msgData.OfflinePushInfo = &sdkws.OfflinePushInfo{
 | 
						|
			Title:         msgFromMQV2.MsgData.OfflinePushInfo.Title,
 | 
						|
			Desc:          msgFromMQV2.MsgData.OfflinePushInfo.Desc,
 | 
						|
			Ex:            msgFromMQV2.MsgData.OfflinePushInfo.Ex,
 | 
						|
			IOSPushSound:  msgFromMQV2.MsgData.OfflinePushInfo.IOSPushSound,
 | 
						|
			IOSBadgeCount: msgFromMQV2.MsgData.OfflinePushInfo.IOSBadgeCount,
 | 
						|
			SignalInfo:    "",
 | 
						|
		}
 | 
						|
	}
 | 
						|
	switch msgData.ContentType {
 | 
						|
	case constant.Text:
 | 
						|
		data, err := json.Marshal(apistruct.TextElem{
 | 
						|
			Content: string(msgFromMQV2.MsgData.Content),
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		msgData.Content = data
 | 
						|
	default:
 | 
						|
		msgData.Content = msgFromMQV2.MsgData.Content
 | 
						|
	}
 | 
						|
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
 | 
						|
	defer cancel()
 | 
						|
	ctx = context.WithValue(context.Background(), constant.OperationID, msgFromMQV2.OperationID)
 | 
						|
	ctx = context.WithValue(ctx, constant.OpUserID, adminUserID)
 | 
						|
 | 
						|
	resp, err := msgClient.SendMsg(ctx, &msg.SendMsgReq{MsgData: msgData})
 | 
						|
	if err != nil {
 | 
						|
		atomic.AddInt64(failed, 1)
 | 
						|
		log.Printf("send msg %+v failed %s\n", msgData, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	atomic.AddInt64(success, 1)
 | 
						|
	log.Printf("send msg success %+v resp %+v\n", msgData, resp)
 | 
						|
}
 |