logrus hook输出日志到本地磁盘的操作

logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!

言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。

首先

go get github.com/sirupsen/logrus

然后

logrus和go lib里面一样有6个等级,可以直接调用

logrus.Debug("Useful debugging information.")

logrus.Info("Something noteworthy happened!")

logrus.Warn("You should probably take a look at this.")

logrus.Error("Something failed but I'm not quitting.")

logrus.Fatal("Bye.") //log之后会调用os.Exit(1)

logrus.Panic("I'm bailing.") //log之后会panic()

项目例子结构

main.go

package main

import (

"fmt"

"github.com/sirupsen/logrus"

"logT/logS"

)

func main() {

//创建一个hook,将日志存储路径输入进去

hook := logS.NewHook("d:/log/golog.log")

//加载hook之前打印日志

logrus.WithField("file", "d:/log/golog.log").Info("New logrus hook err.")

logrus.AddHook(hook)

//加载hook之后打印日志

logrus.WithFields(logrus.Fields{

"animal": "walrus",

}).Info("A walrus appears")

}

hook.go

不要看下面三个go文件代码很长,其实大多数都是固定代码,也就NewHook函数自己扩展定义就好

package logS

import (

"fmt"

"github.com/sirupsen/logrus"

"os"

"strings"

)

// Hook 写文件的Logrus Hook

type Hook struct {

W LoggerInterface

}

func NewHook(file string) (f *Hook) {

w := NewFileWriter()

config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`, file)

err := w.Init(config)

if err != nil {

return nil

}

return &Hook{w}

}

// Fire 实现Hook的Fire接口

func (hook *Hook) Fire(entry *logrus.Entry) (err error) {

message, err := getMessage(entry)

if err != nil {

fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)

return err

}

switch entry.Level {

case logrus.PanicLevel:

fallthrough

case logrus.FatalLevel:

fallthrough

case logrus.ErrorLevel:

return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s", message), LevelError)

case logrus.WarnLevel:

return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s", message), LevelWarn)

case logrus.InfoLevel:

return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s", message), LevelInfo)

case logrus.DebugLevel:

return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s", message), LevelDebug)

default:

return nil

}

}

// Levels 实现Hook的Levels接口

func (hook *Hook) Levels() []logrus.Level {

return []logrus.Level{

logrus.PanicLevel,

logrus.FatalLevel,

logrus.ErrorLevel,

logrus.WarnLevel,

logrus.InfoLevel,

logrus.DebugLevel,

}

}

func getMessage(entry *logrus.Entry) (message string, err error) {

message = message + fmt.Sprintf("%s ", entry.Message)

file, lineNumber := GetCallerIgnoringLogMulti(2)

if file != "" {

sep := fmt.Sprintf("%s/src/", os.Getenv("GOPATH"))

fileName := strings.Split(file, sep)

if len(fileName) >= 2 {

file = fileName[1]

}

}

message = fmt.Sprintf("%s:%d ", file, lineNumber) + message

for k, v := range entry.Data {

message = message + fmt.Sprintf("%v:%v ", k, v)

}

return

}

caller.go

package logS

import (

"runtime"

"strings"

)

func GetCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {

// bump by 1 to ignore the getCaller (this) stackframe

callDepth++

outer:

for {

var ok bool

_, file, line, ok = runtime.Caller(callDepth)

if !ok {

file = "???"

line = 0

break

}

for _, s := range suffixesToIgnore {

if strings.HasSuffix(file, s) {

callDepth++

continue outer

}

}

break

}

return

}

// GetCallerIgnoringLogMulti TODO

func GetCallerIgnoringLogMulti(callDepth int) (string, int) {

// the +1 is to ignore this (getCallerIgnoringLogMulti) frame

return GetCaller(callDepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")

}

file.go

package logS

import (

"encoding/json"

"errors"

"fmt"

"io/ioutil"

"log"

"os"

"path/filepath"

"strings"

"sync"

"time"

)

// RFC5424 log message levels.

const (

LevelError = iota

LevelWarn

LevelInfo

LevelDebug

)

// LoggerInterface Logger接口

type LoggerInterface interface {

Init(config string) error

WriteMsg(msg string, level int) error

Destroy()

Flush()

}

// LogWriter implements LoggerInterface.

// It writes messages by lines limit, file size limit, or time frequency.

type LogWriter struct {

*log.Logger

mw *MuxWriter

// The opened file

Filename string `json:"filename"`

Maxlines int `json:"maxlines"`

maxlinesCurlines int

// Rotate at size

Maxsize int `json:"maxsize"`

maxsizeCursize int

// Rotate daily

Daily bool `json:"daily"`

Maxdays int64 `json:"maxdays"`

dailyOpendate int

Rotate bool `json:"rotate"`

startLock sync.Mutex // Only one log can write to the file

Level int `json:"level"`

}

// MuxWriter an *os.File writer with locker.

type MuxWriter struct {

sync.Mutex

fd *os.File

}

// write to os.File.

func (l *MuxWriter) Write(b []byte) (int, error) {

l.Lock()

defer l.Unlock()

return l.fd.Write(b)

}

// SetFd set os.File in writer.

func (l *MuxWriter) SetFd(fd *os.File) {

if l.fd != nil {

_ = l.fd.Close()

}

l.fd = fd

}

// NewFileWriter create a FileLogWriter returning as LoggerInterface.

func NewFileWriter() LoggerInterface {

w := &LogWriter{

Filename: "",

Maxlines: 1000000,

Maxsize: 1 << 28, //256 MB

Daily: true,

Maxdays: 7,

Rotate: true,

Level: LevelDebug,

}

// use MuxWriter instead direct use os.File for lock write when rotate

w.mw = new(MuxWriter)

// set MuxWriter as Logger's io.Writer

w.Logger = log.New(w.mw, "", log.Ldate|log.Ltime)

return w

}

// Init file logger with json config.

// jsonconfig like:

// {

// "filename":"logs/sample.log",

// "maxlines":10000,

// "maxsize":1<<30,

// "daily":true,

// "maxdays":15,

// "rotate":true

// }

func (w *LogWriter) Init(jsonconfig string) error {

err := json.Unmarshal([]byte(jsonconfig), w)

if err != nil {

return err

}

if len(w.Filename) == 0 {

return errors.New("jsonconfig must have filename")

}

err = w.startLogger()

return err

}

// start file logger. create log file and set to locker-inside file writer.

func (w *LogWriter) startLogger() error {

fd, err := w.createLogFile()

if err != nil {

return err

}

w.mw.SetFd(fd)

err = w.initFd()

if err != nil {

return err

}

return nil

}

func (w *LogWriter) docheck(size int) {

w.startLock.Lock()

defer w.startLock.Unlock()

if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) ||

(w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) ||

(w.Daily && time.Now().Day() != w.dailyOpendate)) {

if err := w.DoRotate(); err != nil {

fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)

return

}

}

w.maxlinesCurlines++

w.maxsizeCursize += size

}

// WriteMsg write logger message into file.

func (w *LogWriter) WriteMsg(msg string, level int) error {

if level > w.Level {

return nil

}

n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "

w.docheck(n)

w.Logger.Print(msg)

return nil

}

func (w *LogWriter) createLogFile() (*os.File, error) {

// Open the log file

fd, err := os.OpenFile(w.Filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0660)

return fd, err

}

func (w *LogWriter) initFd() error {

fd := w.mw.fd

finfo, err := fd.Stat()

if err != nil {

return fmt.Errorf("get stat err: %s", err)

}

w.maxsizeCursize = int(finfo.Size())

w.dailyOpendate = time.Now().Day()

if finfo.Size() > 0 {

content, err := ioutil.ReadFile(w.Filename)

if err != nil {

return err

}

w.maxlinesCurlines = len(strings.Split(string(content), "\n"))

} else {

w.maxlinesCurlines = 0

}

return nil

}

// DoRotate means it need to write file in new file.

// new file name like xx.log.2013-01-01.2

func (w *LogWriter) DoRotate() error {

_, err := os.Lstat(w.Filename)

if err == nil { // file exists

// Find the next available number

num := 1

fname := ""

for ; err == nil && num <= 999; num++ {

fname = w.Filename + fmt.Sprintf(".%s.%03d", time.Now().Format("2006-01-02"), num)

_, err = os.Lstat(fname)

}

// return error if the last file checked still existed

if err == nil {

return fmt.Errorf("Rotate: Cannot find free log number to rename %s", w.Filename)

}

// block Logger's io.Writer

w.mw.Lock()

defer w.mw.Unlock()

fd := w.mw.fd

_ = fd.Close()

// close fd before rename

// Rename the file to its newfound home

err = os.Rename(w.Filename, fname)

if err != nil {

return fmt.Errorf("Rotate: %s", err)

}

// re-start logger

err = w.startLogger()

if err != nil {

return fmt.Errorf("Rotate StartLogger: %s", err)

}

go w.deleteOldLog()

}

return nil

}

func (w *LogWriter) deleteOldLog() {

dir := filepath.Dir(w.Filename)

_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {

defer func() {

if r := recover(); r != nil {

returnErr = fmt.Errorf("Unable to delete old log '%s', error: %+v", path, r)

fmt.Println(returnErr)

}

}()

if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) {

if strings.HasPrefix(filepath.Base(path), filepath.Base(w.Filename)) {

_ = os.Remove(path)

}

}

return

})

}

// Destroy destroy file logger, close file writer.

func (w *LogWriter) Destroy() {

_ = w.mw.fd.Close()

}

// Flush file logger.

// there are no buffering messages in file logger in memory.

// flush file means sync file from disk.

func (w *LogWriter) Flush() {

_ = w.mw.fd.Sync()

}

补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook

logrus Hook 分析

logrus hook 接口定义很简单。如下

package logrus

// A hook to be fired when logging on the logging levels returned from

// `Levels()` on your implementation of the interface. Note that this is not

// fired in a goroutine or a channel with workers, you should handle such

// functionality yourself if your call is non-blocking and you don't wish for

// the logging calls for levels returned from `Levels()` to block.

type Hook interface {

Levels() []Level

Fire(*Entry) error

}

// Internal type for storing the hooks on a logger instance.

type LevelHooks map[Level][]Hook

// Add a hook to an instance of logger. This is called with

// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.

func (hooks LevelHooks) Add(hook Hook) {

for _, level := range hook.Levels() {

hooks[level] = append(hooks[level], hook)

}

}

// Fire all the hooks for the passed level. Used by `entry.log` to fire

// appropriate hooks for a log entry.

func (hooks LevelHooks) Fire(level Level, entry *Entry) error {

for _, hook := range hooks[level] {

if err := hook.Fire(entry); err != nil {

return err

}

}

return nil

}

只需实现 该结构的接口。

type Hook interface {

Levels() []Level

Fire(*Entry) error

}

就会被logrus框架遍历调用已注册的 hook 的 Fire 方法

获取日志实例

// log_hook.go

package logger

import (

"fmt"

"github.com/sirupsen/logrus"

"library/util/constant"

"os"

)

//自实现 logrus hook

func getLogger(module string) *logrus.Logger {

//实例化

logger := logrus.New()

//设置输出

logger.Out = os.Stdout

//设置日志级别

logger.SetLevel(logrus.DebugLevel)

//设置日志格式

//自定writer就行, hook 交给 lfshook

logger.AddHook(newLogrusHook(constant.GetLogPath(), module))

logger.SetFormatter(&logrus.JSONFormatter{

TimestampFormat:"2006-01-02 15:04:05",

})

return logger

}

//确保每次调用使用的文件都是唯一的。

func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {

logger:= getLogger(module)

return logger.WithFields(logrus.Fields{

"app": appField,

})

}

//订阅 警告日志

func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {

logger := entry.Logger

logger.AddHook(newSubScribeHook(subMap))

fmt.Println("日志订阅成功")

}

constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。

日志切片hook

代码

// writer.go

package logger

import (

"fmt"

"github.com/pkg/errors"

"io"

"library/util"

"os"

"path/filepath"

"sync"

"time"

)

type LogWriter struct {

logDir string //日志根目录地址。

module string //模块 名

curFileName string //当前被指定的filename

curBaseFileName string //在使用中的file

turnCateDuration time.Duration

mutex sync.RWMutex

outFh *os.File

}

func (w *LogWriter) Write(p []byte) (n int, err error) {

w.mutex.Lock()

defer w.mutex.Unlock()

if out, err:= w.getWriter(); err!=nil {

return 0, errors.New("failed to fetch target io.Writer")

}else{

return out.Write(p)

}

}

func (w *LogWriter) getFileName() string {

base := time.Now().Truncate(w.turnCateDuration)

return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))

}

func (w *LogWriter) getWriter()(io.Writer, error) {

fileName := w.curBaseFileName

//判断是否有新的文件名

//会出现新的文件名

baseFileName := w.getFileName()

if baseFileName != fileName {

fileName = baseFileName

}

dirname := filepath.Dir(fileName)

if err := os.MkdirAll(dirname, 0755); err != nil {

return nil, errors.Wrapf(err, "failed to create directory %s", dirname)

}

fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)

if err != nil {

return nil, errors.Errorf("failed to open file %s", err)

}

w.outFh.Close()

w.outFh = fileHandler

w.curBaseFileName = fileName

w.curFileName = fileName

return fileHandler, nil

}

func New(logPath, module string, duration time.Duration) *LogWriter {

return &LogWriter{

logDir: logPath,

module: module,

turnCateDuration:duration,

curFileName: "",

curBaseFileName: "",

}

}

// hook.go

package logger

import (

"github.com/rifflock/lfshook"

"github.com/sirupsen/logrus"

"time"

)

func newLogrusHook(logPath, moduel string) logrus.Hook {

logrus.SetLevel(logrus.WarnLevel)

writer := New(logPath, moduel, time.Hour * 2)

lfsHook := lfshook.NewHook(lfshook.WriterMap{

logrus.DebugLevel: writer,

logrus.InfoLevel: writer,

logrus.WarnLevel: writer,

logrus.ErrorLevel: writer,

logrus.FatalLevel: writer,

logrus.PanicLevel: writer,

}, &logrus.TextFormatter{DisableColors: true})

// writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数

// writer 是实现了writer 接口的库,在日志调用write是做预处理

return lfsHook

}

测试代码

func TestGetLogger(t *testing.T) {

lg := GetNewFieldLoggerContext("test","d")

lg.Logger.Info("????")

}

解析

logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。

注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。

邮件警报hook

代码

// subscribeHook.go

package logger

import (

"fmt"

"github.com/sirupsen/logrus"

"library/email"

"strings"

)

type SubscribeMap map[logrus.Level][]*email.Receiver

type SubscribeHook struct {

subMap SubscribeMap

}

//此处可以自实现hook 目前使用三方hook

func(h *SubscribeHook)Levels() []logrus.Level{

return logrus.AllLevels

}

func(h *SubscribeHook)Fire(entry *logrus.Entry) error{

for level, receivers := range h.subMap {

//命中 准备消费

if level == entry.Level {

if len(receivers) > 0 {

email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),

fmt.Sprintf("错误内容: %s",entry.Message))

}

}

}

return nil

}

func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{

subMap := SubscribeMap{}

addressList := strings.Split(receiverStr,";")

var receivers []*email.Receiver

for _, address := range addressList {

receivers = append(receivers, &email.Receiver{Email: address})

}

subMap[level] = receivers

return subMap

}

func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {

return &SubscribeHook{subMap}

// email.go

package email

import (

"fmt"

"gopkg.in/gomail.v2"

"regexp"

"strconv"

)

type Sender struct {

User string

Password string

Host string

Port int

MailTo []string

Subject string

Content string

}

type Receiver struct {

Email string

}

func (r *Receiver) Check() bool {

pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱

reg := regexp.MustCompile(pattern)

return reg.MatchString(r.Email)

}

func (s *Sender) clean (){

}

//检查 邮箱正确性

func (s *Sender)NewReceiver(email string) *Receiver {

rec := &Receiver{Email:email}

if rec.Check() {

m.MailTo = []string{email}

return rec

}else{

fmt.Printf("email check fail 【%s】\n", email)

return nil

}

}

func (s *Sender)NewReceivers(receivers []*Receiver) {

for _, rec := range receivers {

if rec.Check() {

m.MailTo = append(m.MailTo, rec.Email)

}else{

fmt.Printf("email check fail 【%s】\n", rec.Email)

}

}

}

// 163邮箱 password 为开启smtp后给的秘钥

var m = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}

func SendEmail(receivers []*Receiver,subject, content string){

m.NewReceivers(receivers)

m.Subject = subject

m.Content = content

e := gomail.NewMessage()

e.SetHeader("From", e.FormatAddress(m.User, "hengsheng"))

e.SetHeader("To", m.MailTo...) //发送给多个用户

e.SetHeader("Subject", m.Subject) //设置邮件主题

e.SetBody("text/html", m.Content) //设置邮件正文

d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)

err := d.DialAndSend(e)

if err != nil {

fmt.Printf("error 邮件发送错误! %s \n", err.Error())

}

}

使用

同理在writer时 如果是错误日志则发送邮件。

o.logger = logger.GetNewFieldLoggerContext("test", "666")

if subscribeSocket {

logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))

}

// o 为实际结构体实例

kafkahook

// kafka hook

package logger

import (

"github.com/sirupsen/logrus"

"library/kafka"

"library/util/constant"

)

type KafKaHook struct {

kafkaProducer *kafka.KafkaProducer

}

func(h *KafKaHook)Levels() []logrus.Level{

return logrus.AllLevels

}

func(h *KafKaHook)Fire(entry *logrus.Entry) error{

h.kafkaProducer.SendMsgSync(entry.Message)

return nil

}

func newKafkaHook() *KafKaHook{

producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)

return &KafKaHook{kafkaProducer: producer}

}

使用时logger.AddHook(newKafkaHook()) 即可

kafka模块

生产者

// kafkaProducer.go

package kafka

import (

"errors"

"fmt"

"github.com/Shopify/sarama"

"library/util/constant"

"log"

"time"

)

func GetKafkaAddress()[]string{

return "127.0.0.1:9092"

}

//同步消息模式

func SyncProducer(topic, message string) error {

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Producer.Timeout = 5 * time.Second

p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)

if err != nil {

return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))

}

defer p.Close()

msg := &sarama.ProducerMessage{

Topic: topic,

Value: sarama.ByteEncoder(message),

}

part, offset, err := p.SendMessage(msg)

if err != nil {

return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))

} else {

fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)

return nil

}

}

//async 异步生产者

type KafkaProducer struct {

topic string

asyncProducer *sarama.AsyncProducer

syncProducer *sarama.SyncProducer

sync bool

}

func NewKafkaProducer(topic string, sync bool) *KafkaProducer {

k := &KafkaProducer{

topic: topic,

sync: sync,

}

if sync {

k.initSync()

}else{

k.initAsync()

}

return k

}

func (k *KafkaProducer) initAsync() bool {

if k.sync {

fmt.Printf("sync producer cant call async func !\n")

return false

}

config := sarama.NewConfig()

//等待服务器所有副本都保存成功后的响应

config.Producer.RequiredAcks = sarama.WaitForAll

//随机向partition发送消息

config.Producer.Partitioner = sarama.NewRandomPartitioner

//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.

config.Producer.Return.Successes = true

config.Producer.Return.Errors = true

//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置

//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息

config.Version = sarama.V0_10_0_1

producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)

if e != nil {

fmt.Println(e)

return false

}

k.asyncProducer = &producer

defer producer.AsyncClose()

pd := *k.asyncProducer

go func() {

for{

select {

case <-pd.Successes():

//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)

case fail := <-pd.Errors():

fmt.Printf("err: %s \n", fail.Err.Error())

}

}

}()

return true

}

func (k *KafkaProducer) initSync() bool {

if !k.sync {

fmt.Println("async producer cant call sync func !")

return false

}

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Producer.Timeout = 5 * time.Second

p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)

k.syncProducer = &p

if err != nil {

log.Printf("sarama.NewSyncProducer err, message=%s \n", err)

return false

}

return true

}

func (k *KafkaProducer) SendMsgAsync(sendStr string) {

msg := &sarama.ProducerMessage{

Topic: k.topic,

}

//将字符串转化为字节数组

msg.Value = sarama.ByteEncoder(sendStr)

//fmt.Println(value)

//使用通道发送

pd := *k.asyncProducer

pd.Input() <- msg

}

func (k *KafkaProducer) SendMsgSync(sendStr string) bool {

msg := &sarama.ProducerMessage{

Topic: k.topic,

Value: sarama.ByteEncoder(sendStr),

}

pd := *k.syncProducer

part, offset, err := pd.SendMessage(msg)

if err != nil {

fmt.Printf("发送失败 send message(%s) err=%s \n", sendStr, err)

return false

} else {

fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)

return true

}

}

调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致!

消费者组

// kafkaConsumerGroup.go

package kafka

import (

"context"

"fmt"

"github.com/Shopify/sarama"

"log"

"sync"

)

func NewKafkaConsumerGroup(topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {

k := &KafkaConsumerGroup{

brokers: GetKafkaAddress(),

topics: topics,

group: group,

channelBufferSize: 2,

ready: make(chan bool),

version: "1.1.1",

handler: businessCall,

}

k.Init()

return k

}

// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,

// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个

// Consumer 消费,但可以被多个 consumer group 消费

type KafkaConsumerGroup struct {

//代理(broker): 一台kafka服务器称之为一个broker

brokers []string

//主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中

topics []string

version string

ready chan bool

group string

channelBufferSize int

//业务调用

handler func(message *sarama.ConsumerMessage) bool

}

func (k *KafkaConsumerGroup)Init() func() {

version,err := sarama.ParseKafkaVersion(k.version)

if err!=nil{

fmt.Printf("Error parsing Kafka version: %v", err)

}

cfg := sarama.NewConfig()

cfg.Version = version

// 分区分配策略

cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange

// 未找到组消费位移的时候从哪边开始消费

cfg.Consumer.Offsets.Initial = -2

// channel长度

cfg.ChannelBufferSize = k.channelBufferSize

ctx, cancel := context.WithCancel(context.Background())

client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)

if err != nil {

fmt.Printf("Error creating consumer group client: %v", err)

}

wg := &sync.WaitGroup{}

wg.Add(1)

go func() {

defer func() {

wg.Done()

//util.HandlePanic("client.Consume panic", log.StandardLogger())

}()

for {

if err := client.Consume(ctx, k.topics, k); err != nil {

log.Printf("Error from consumer: %v", err)

}

// check if context was cancelled, signaling that the consumer should stop

if ctx.Err() != nil {

log.Println(ctx.Err())

return

}

k.ready = make(chan bool)

}

}()

<-k.ready

fmt.Printf("Sarama consumer up and running!... \n")

// 保证在系统退出时,通道里面的消息被消费

return func() {

cancel()

wg.Wait()

if err = client.Close(); err != nil {

fmt.Printf("Error closing client: %v \n", err)

}

}

}

// Setup is run at the beginning of a new session, before ConsumeClaim

func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {

// Mark the consumer as ready

close(k.ready)

return nil

}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {

return nil

}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

// NOTE:

// Do not move the code below to a goroutine.

// The `ConsumeClaim` itself is called within a goroutine, see:

// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29

// 具体消费消息

for message := range claim.Messages() {

//msg := string(message.Value)

//k.logger.Infof("卡夫卡: %s", msg)

if ok:= k.handler(message); ok {

// 更新位移

session.MarkMessage(message, "")

}

//run.Run(msg)

}

return nil

}

测试代码

func TestKafkaConsumerGroup_Init(t *testing.T) {

//pd := NewKafkaProducer("test-fail",true)

//pd.InitSync()

k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {

fmt.Println(string(message.Value))

//如果失败的处理逻辑

//if ok := pd.SendMsgSync("666666"); ok {

// return true

//}

return false

})

consumerDone := k.Init()

sigterm := make(chan os.Signal, 1)

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

select {

case <-sigterm:

fmt.Println("terminating: via signal")

}

consumerDone()

}

这里有一些补偿逻辑在里面。

以上就是logrus相关hook。

好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

以上是 logrus hook输出日志到本地磁盘的操作 的全部内容, 来源链接: utcz.com/p/235650.html

回到顶部