diff --git a/src/common/constant/constant.go b/src/common/constant/constant.go new file mode 100644 index 000000000..9a90b96d6 --- /dev/null +++ b/src/common/constant/constant.go @@ -0,0 +1,46 @@ +package constant + +const ( + + //group admin + GroupAdmin = 1 + //feiend related + BlackListFlag = 1 + NotFriendFlag = 0 + FriendFlag = 1 + + //Websocket Protocol + WSGetNewestSeq = 1001 + WSPullMsg = 1002 + WSSendMsg = 1003 + WSPushMsg = 2001 + + ///ContentType + //UserRelated + Text = 101 + Picture = 102 + Voice = 103 + Video = 104 + File = 105 + + SyncSenderMsg = 108 + //SysRelated + AddFriendTip = 201 + AgreeAddFriendTip = 202 + KickOnlineTip = 203 + + //MsgFrom + UserMsgType = 100 + SysMsgType = 200 + + //SessionType + SingleChatType = 1 + GroupChatType = 2 +) + +var ContentType2PushContent = map[int64]string{ + Picture: "[picture]", + Voice: "[voice]", + Video: "[video]", + File: "[file]", +} diff --git a/src/common/kafka/consumer.go b/src/common/kafka/consumer.go new file mode 100644 index 000000000..eed6ef142 --- /dev/null +++ b/src/common/kafka/consumer.go @@ -0,0 +1,36 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "sync" +) + +type Consumer struct { + addr []string + WG sync.WaitGroup + Topic string + PartitionList []int32 + Consumer sarama.Consumer +} + +func NewKafkaConsumer(addr []string, topic string) *Consumer { + p := Consumer{} + p.Topic = topic + p.addr = addr + + consumer, err := sarama.NewConsumer(p.addr, nil) + if err != nil { + panic(err) + return nil + } + p.Consumer = consumer + + partitionList, err := consumer.Partitions(p.Topic) + if err != nil { + panic(err) + return nil + } + p.PartitionList = partitionList + + return &p +} diff --git a/src/common/kafka/consumer_group.go b/src/common/kafka/consumer_group.go new file mode 100644 index 000000000..4c4af5033 --- /dev/null +++ b/src/common/kafka/consumer_group.go @@ -0,0 +1,53 @@ +/* +** description(""). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/5/11 9:36). + */ +package kafka + +import ( + "context" + "github.com/Shopify/sarama" +) + +type MConsumerGroup struct { + sarama.ConsumerGroup + groupID string + topics []string +} + +type MConsumerGroupConfig struct { + KafkaVersion sarama.KafkaVersion + OffsetsInitial int64 + IsReturnErr bool +} + +func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { + config := sarama.NewConfig() + config.Version = consumerConfig.KafkaVersion + config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + config.Consumer.Return.Errors = consumerConfig.IsReturnErr + client, err := sarama.NewClient(addr, config) + if err != nil { + panic(err) + } + consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + if err != nil { + panic(err) + } + return &MConsumerGroup{ + consumerGroup, + groupID, + topics, + } +} +func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { + ctx := context.Background() + for { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if err != nil { + panic(err) + } + } +} diff --git a/src/common/kafka/producer.go b/src/common/kafka/producer.go new file mode 100644 index 000000000..d2071eab6 --- /dev/null +++ b/src/common/kafka/producer.go @@ -0,0 +1,49 @@ +package kafka + +import ( + log2 "Open_IM/src/common/log" + "github.com/Shopify/sarama" + "github.com/golang/protobuf/proto" +) + +type Producer struct { + topic string + addr []string + config *sarama.Config + producer sarama.SyncProducer +} + +func NewKafkaProducer(addr []string, topic string) *Producer { + p := Producer{} + p.config = sarama.NewConfig() //Instantiate a sarama Config + p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all + p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly + + p.addr = addr + p.topic = topic + + producer, err := sarama.NewSyncProducer(p.addr, p.config) //初始化客户端 + if err != nil { + panic(err) + return nil + } + p.producer = producer + return &p +} + +func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) { + kMsg := &sarama.ProducerMessage{} + kMsg.Topic = p.topic + if len(key) == 1 { + kMsg.Key = sarama.StringEncoder(key[0]) + } + bMsg, err := proto.Marshal(m) + if err != nil { + log2.Error("", "", "proto marshal err = %s", err.Error()) + return -1, -1, err + } + kMsg.Value = sarama.ByteEncoder(bMsg) + + return p.producer.SendMessage(kMsg) +} diff --git a/src/common/log/es_hk.go b/src/common/log/es_hk.go new file mode 100644 index 000000000..e52d988fd --- /dev/null +++ b/src/common/log/es_hk.go @@ -0,0 +1,108 @@ +/* +** description("Hook to send logs to elasticsearch"). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/3/26 17:05). + */ +package log + +import ( + "Open_IM/src/common/config" + "context" + "fmt" + elasticV7 "github.com/olivere/elastic/v7" + "github.com/sirupsen/logrus" + "log" + "os" + "strings" + "time" +) + +//esHook custom es hook +type esHook struct { + moduleName string + client *elasticV7.Client +} + +//newEsHook initialization +func newEsHook(moduleName string) *esHook { + //https://github.com/sohlich/elogrus + //client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200")) + //if err != nil { + // log.Panic(err) + //} + //hook, err := elogrus.NewAsyncElasticHook(client, "localhost", logrus.DebugLevel, "mylog") + //if err != nil { + // log.Panic(err) + //} + es, err := elasticV7.NewClient( + elasticV7.SetURL(config.Config.Log.ElasticSearchAddr...), + elasticV7.SetBasicAuth(config.Config.Log.ElasticSearchUser, config.Config.Log.ElasticSearchPassword), + elasticV7.SetSniff(false), + elasticV7.SetHealthcheckInterval(60*time.Second), + elasticV7.SetErrorLog(log.New(os.Stderr, "ES:", log.LstdFlags)), + ) + + if err != nil { + log.Fatal("failed to create Elastic V7 Client: ", err) + } + + //info, code, err := es.Ping(logConfig.ElasticSearch.EsAddr[0]).Do(context.Background()) + //if err != nil { + // panic(err) + //} + //fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number) + // + //esversion, err := es.ElasticsearchVersion(logConfig.ElasticSearch.EsAddr[0]) + //if err != nil { + // panic(err) + //} + //fmt.Printf("Elasticsearch version %s\n", esversion) + return &esHook{client: es, moduleName: moduleName} +} + +//Fire log hook interface method +func (hook *esHook) Fire(entry *logrus.Entry) error { + doc := newEsLog(entry) + go hook.sendEs(doc) + return nil +} + +//Levels log hook interface method, the log affected by this hook +func (hook *esHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +//sendEs Asynchronously send logs to es +func (hook *esHook) sendEs(doc appLogDocModel) { + defer func() { + if r := recover(); r != nil { + fmt.Println("send entry to es failed: ", r) + } + }() + _, err := hook.client.Index().Index(hook.moduleName).Type(doc.indexName()).BodyJson(doc).Do(context.Background()) + if err != nil { + log.Println(err) + } + +} + +//appLogDocModel es model +type appLogDocModel map[string]interface{} + +func newEsLog(e *logrus.Entry) appLogDocModel { + ins := make(map[string]interface{}) + ins["level"] = strings.ToUpper(e.Level.String()) + ins["time"] = e.Time.Format("2006-01-02 15:04:05") + for kk, vv := range e.Data { + ins[kk] = vv + } + ins["tipInfo"] = e.Message + + return ins +} + +// indexName es index name time division +func (m *appLogDocModel) indexName() string { + return time.Now().Format("2006-01-02") +} diff --git a/src/common/log/file_line_hk.go b/src/common/log/file_line_hk.go new file mode 100644 index 000000000..c9633c4b9 --- /dev/null +++ b/src/common/log/file_line_hk.go @@ -0,0 +1,60 @@ +/* +** description("Get the hook of the calling file name and line number"). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/3/16 11:26). + */ +package log + +import ( + "fmt" + "github.com/sirupsen/logrus" + "runtime" + "strings" +) + +type fileHook struct{} + +func newFileHook() *fileHook { + return &fileHook{} +} + +func (f *fileHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func (f *fileHook) Fire(entry *logrus.Entry) error { + entry.Data["FilePath"] = findCaller(5) + return nil +} + +func findCaller(skip int) string { + file := "" + line := 0 + for i := 0; i < 10; i++ { + file, line = getCaller(skip + i) + if !strings.HasPrefix(file, "log") { + break + } + } + return fmt.Sprintf("%s:%d", file, line) +} + +func getCaller(skip int) (string, int) { + _, file, line, ok := runtime.Caller(skip) + if !ok { + return "", 0 + } + + n := 0 + for i := len(file) - 1; i > 0; i-- { + if file[i] == '/' { + n++ + if n >= 2 { + file = file[i+1:] + break + } + } + } + return file, line +} diff --git a/src/common/log/logrus.go b/src/common/log/logrus.go new file mode 100644 index 000000000..6ace5f963 --- /dev/null +++ b/src/common/log/logrus.go @@ -0,0 +1,193 @@ +package log + +import ( + "Open_IM/src/common/config" + "fmt" + nested "github.com/antonfisher/nested-logrus-formatter" + rotatelogs "github.com/lestrrat-go/file-rotatelogs" + "github.com/rifflock/lfshook" + "github.com/sirupsen/logrus" + "os" + "time" +) + +var logger *Logger + +type Logger struct { + *logrus.Logger + Pid int +} + +func init() { + logger = loggerInit("") + +} +func NewPrivateLog(moduleName string) { + logger = loggerInit(moduleName) +} + +func loggerInit(moduleName string) *Logger { + var logger = logrus.New() + //All logs will be printed + logger.SetLevel(logrus.TraceLevel) + //Log Style Setting + logger.SetFormatter(&nested.Formatter{ + TimestampFormat: "2006-01-02 15:04:05", + HideKeys: false, + FieldsOrder: []string{"PID"}, + }) + //File name and line number display hook + logger.AddHook(newFileHook()) + + //Send logs to elasticsearch hook + if config.Config.Log.ElasticSearchSwitch == true { + logger.AddHook(newEsHook(moduleName)) + } + //Log file segmentation hook + hook := NewLfsHook(config.Config.Log.StorageLocation+time.Now().Format("2006-01-02")+".log", 0, 5, moduleName) + logger.AddHook(hook) + return &Logger{ + logger, + os.Getpid(), + } +} +func NewLfsHook(logName string, rotationTime time.Duration, maxRemainNum uint, moduleName string) logrus.Hook { + var fileNameSuffix string + if GetCurrentTimestamp() >= GetCurDayZeroTimestamp() && GetCurrentTimestamp() <= GetCurDayHalfTimestamp() { + fileNameSuffix = time.Now().Format("2006-01-02") + ".log" + } else { + fileNameSuffix = time.Now().Format("2006-01-02") + ".log" + } + writer, err := rotatelogs.New( + logName, + rotatelogs.WithRotationCount(maxRemainNum), + ) + if err != nil { + panic(err) + } + writeInfo, err := rotatelogs.New( + config.Config.Log.StorageLocation+moduleName+"/info."+fileNameSuffix, + rotatelogs.WithRotationTime(time.Duration(60)*time.Second), + rotatelogs.WithRotationCount(maxRemainNum), + ) + writeError, err := rotatelogs.New( + config.Config.Log.StorageLocation+moduleName+"/error."+fileNameSuffix, + rotatelogs.WithRotationTime(time.Minute), + rotatelogs.WithRotationCount(maxRemainNum), + ) + writeDebug, err := rotatelogs.New( + config.Config.Log.StorageLocation+moduleName+"/debug."+fileNameSuffix, + rotatelogs.WithRotationCount(maxRemainNum), + ) + writeWarn, err := rotatelogs.New( + config.Config.Log.StorageLocation+moduleName+"/warn."+fileNameSuffix, + rotatelogs.WithRotationTime(time.Minute), + rotatelogs.WithRotationCount(maxRemainNum), + ) + if err != nil { + panic(err) + } + lfsHook := lfshook.NewHook(lfshook.WriterMap{ + logrus.DebugLevel: writeDebug, + logrus.InfoLevel: writeInfo, + logrus.WarnLevel: writeWarn, + logrus.ErrorLevel: writeError, + logrus.FatalLevel: writer, + logrus.PanicLevel: writer, + }, &nested.Formatter{ + TimestampFormat: "2006-01-02 15:04:05", + HideKeys: false, + FieldsOrder: []string{"PID"}, + }) + + return lfsHook +} + +func Info(token, OperationID, format string, args ...interface{}) { + if token == "" && OperationID == "" { + logger.WithFields(logrus.Fields{}).Infof(format, args...) + } else { + logger.WithFields(logrus.Fields{ + "token": token, + "OperationID": OperationID, + }).Infof(format, args...) + } +} + +func Error(token, OperationID, format string, args ...interface{}) { + if token == "" && OperationID == "" { + logger.WithFields(logrus.Fields{}).Errorf(format, args...) + } else { + logger.WithFields(logrus.Fields{ + "token": token, + "OperationID": OperationID, + }).Errorf(format, args...) + } +} + +func Debug(token, OperationID, format string, args ...interface{}) { + if token == "" && OperationID == "" { + logger.WithFields(logrus.Fields{}).Debugf(format, args...) + } else { + logger.WithFields(logrus.Fields{ + "token": token, + "OperationID": OperationID, + }).Debugf(format, args...) + } +} + +func Warning(token, OperationID, format string, args ...interface{}) { + if token == "" && OperationID == "" { + logger.WithFields(logrus.Fields{}).Warningf(format, args...) + } else { + logger.WithFields(logrus.Fields{ + "token": token, + "OperationID": OperationID, + }).Warningf(format, args...) + } +} + +func InfoByArgs(format string, args ...interface{}) { + logger.WithFields(logrus.Fields{}).Infof(format, args) +} + +func ErrorByArgs(format string, args ...interface{}) { + logger.WithFields(logrus.Fields{}).Errorf(format, args...) +} + +//Print log information in k, v format, +//kv is best to appear in pairs. tipInfo is the log prompt information for printing, +//and kv is the key and value for printing. +func InfoByKv(tipInfo, OperationID string, args ...interface{}) { + fields := make(logrus.Fields) + argsHandle(OperationID, fields, args) + logger.WithFields(fields).Info(tipInfo) +} +func ErrorByKv(tipInfo, OperationID string, args ...interface{}) { + fields := make(logrus.Fields) + argsHandle(OperationID, fields, args) + logger.WithFields(fields).Error(tipInfo) +} +func DebugByKv(tipInfo, OperationID string, args ...interface{}) { + fields := make(logrus.Fields) + argsHandle(OperationID, fields, args) + logger.WithFields(fields).Debug(tipInfo) +} +func WarnByKv(tipInfo, OperationID string, args ...interface{}) { + fields := make(logrus.Fields) + argsHandle(OperationID, fields, args) + logger.WithFields(fields).Warn(tipInfo) +} + +//internal method +func argsHandle(OperationID string, fields logrus.Fields, args []interface{}) { + for i := 0; i < len(args); i += 2 { + if i+1 < len(args) { + fields[fmt.Sprintf("%v", args[i])] = args[i+1] + } else { + fields[fmt.Sprintf("%v", args[i])] = "" + } + } + fields["operationID"] = OperationID + fields["PID"] = logger.Pid +} diff --git a/src/common/log/time_format.go b/src/common/log/time_format.go new file mode 100644 index 000000000..4ae1c56ef --- /dev/null +++ b/src/common/log/time_format.go @@ -0,0 +1,57 @@ +/* +** description(""). +** copyright('tuoyun,www.tuoyun.net'). +** author("fg,Gordon@tuoyun.net"). +** time(2021/2/22 11:52). + */ +package log + +import ( + "strconv" + "time" +) + +const ( + TimeOffset = 8 * 3600 //8个小时的偏移量 + HalfOffset = 12 * 3600 //半天的小时偏移量 +) + +//获取当前的时间戳 +func GetCurrentTimestamp() int64 { + return time.Now().Unix() +} + +//获取当天0点的时间戳 +func GetCurDayZeroTimestamp() int64 { + timeStr := time.Now().Format("2006-01-02") + t, _ := time.Parse("2006-01-02", timeStr) + return t.Unix() - TimeOffset +} + +//获取当天12点的时间戳 +func GetCurDayHalfTimestamp() int64 { + return GetCurDayZeroTimestamp() + HalfOffset + +} + +//获取当天0点格式化时间,格式为"2006-01-02_00-00-00" +func GetCurDayZeroTimeFormat() string { + return time.Unix(GetCurDayZeroTimestamp(), 0).Format("2006-01-02_15-04-05") +} + +//获取当天12点格式化时间,格式为"2006-01-02_12-00-00" +func GetCurDayHalfTimeFormat() string { + return time.Unix(GetCurDayZeroTimestamp()+HalfOffset, 0).Format("2006-01-02_15-04-05") +} +func GetTimeStampByFormat(datetime string) string { + timeLayout := "2006-01-02 15:04:05" //转化所需模板 + loc, _ := time.LoadLocation("Local") //获取时区 + tmp, _ := time.ParseInLocation(timeLayout, datetime, loc) + timestamp := tmp.Unix() //转化为时间戳 类型是int64 + return strconv.FormatInt(timestamp, 10) +} + +func TimeStringFormatTimeUnix(timeFormat string, timeSrc string) int64 { + tm, _ := time.Parse(timeFormat, timeSrc) + return tm.Unix() +}