logrus hook输出日志到本地磁盘的操作
作者:焗个面包 发布时间:2024-04-26 17:34:47
logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!
言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。
首先
go get github.com/sirupsen/logrus
然后
logrus和go lib里面一样有6个等级,可以直接调用
logrus.Debug("Useful debugging information.")
logrus.Info("Something noteworthy happened!")
logrus.Warn("You should probably take a look at this.")
logrus.Error("Something failed but I'm not quitting.")
logrus.Fatal("Bye.") //log之后会调用os.Exit(1)
logrus.Panic("I'm bailing.") //log之后会panic()
项目例子结构
main.go
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"logT/logS"
)
func main() {
//创建一个hook,将日志存储路径输入进去
hook := logS.NewHook("d:/log/golog.log")
//加载hook之前打印日志
logrus.WithField("file", "d:/log/golog.log").Info("New logrus hook err.")
logrus.AddHook(hook)
//加载hook之后打印日志
logrus.WithFields(logrus.Fields{
"animal": "walrus",
}).Info("A walrus appears")
}
hook.go
不要看下面三个go文件代码很长,其实大多数都是固定代码,也就NewHook函数自己扩展定义就好
package logS
import (
"fmt"
"github.com/sirupsen/logrus"
"os"
"strings"
)
// Hook 写文件的Logrus Hook
type Hook struct {
W LoggerInterface
}
func NewHook(file string) (f *Hook) {
w := NewFileWriter()
config := fmt.Sprintf(`{"filename":"%s","maxdays":7}`, file)
err := w.Init(config)
if err != nil {
return nil
}
return &Hook{w}
}
// Fire 实现Hook的Fire接口
func (hook *Hook) Fire(entry *logrus.Entry) (err error) {
message, err := getMessage(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
return err
}
switch entry.Level {
case logrus.PanicLevel:
fallthrough
case logrus.FatalLevel:
fallthrough
case logrus.ErrorLevel:
return hook.W.WriteMsg(fmt.Sprintf("[ERROR] %s", message), LevelError)
case logrus.WarnLevel:
return hook.W.WriteMsg(fmt.Sprintf("[WARN] %s", message), LevelWarn)
case logrus.InfoLevel:
return hook.W.WriteMsg(fmt.Sprintf("[INFO] %s", message), LevelInfo)
case logrus.DebugLevel:
return hook.W.WriteMsg(fmt.Sprintf("[DEBUG] %s", message), LevelDebug)
default:
return nil
}
}
// Levels 实现Hook的Levels接口
func (hook *Hook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}
func getMessage(entry *logrus.Entry) (message string, err error) {
message = message + fmt.Sprintf("%s ", entry.Message)
file, lineNumber := GetCallerIgnoringLogMulti(2)
if file != "" {
sep := fmt.Sprintf("%s/src/", os.Getenv("GOPATH"))
fileName := strings.Split(file, sep)
if len(fileName) >= 2 {
file = fileName[1]
}
}
message = fmt.Sprintf("%s:%d ", file, lineNumber) + message
for k, v := range entry.Data {
message = message + fmt.Sprintf("%v:%v ", k, v)
}
return
}
caller.go
package logS
import (
"runtime"
"strings"
)
func GetCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
// bump by 1 to ignore the getCaller (this) stackframe
callDepth++
outer:
for {
var ok bool
_, file, line, ok = runtime.Caller(callDepth)
if !ok {
file = "???"
line = 0
break
}
for _, s := range suffixesToIgnore {
if strings.HasSuffix(file, s) {
callDepth++
continue outer
}
}
break
}
return
}
// GetCallerIgnoringLogMulti TODO
func GetCallerIgnoringLogMulti(callDepth int) (string, int) {
// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
return GetCaller(callDepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
}
file.go
package logS
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// RFC5424 log message levels.
const (
LevelError = iota
LevelWarn
LevelInfo
LevelDebug
)
// LoggerInterface Logger接口
type LoggerInterface interface {
Init(config string) error
WriteMsg(msg string, level int) error
Destroy()
Flush()
}
// LogWriter implements LoggerInterface.
// It writes messages by lines limit, file size limit, or time frequency.
type LogWriter struct {
*log.Logger
mw *MuxWriter
// The opened file
Filename string `json:"filename"`
Maxlines int `json:"maxlines"`
maxlinesCurlines int
// Rotate at size
Maxsize int `json:"maxsize"`
maxsizeCursize int
// Rotate daily
Daily bool `json:"daily"`
Maxdays int64 `json:"maxdays"`
dailyOpendate int
Rotate bool `json:"rotate"`
startLock sync.Mutex // Only one log can write to the file
Level int `json:"level"`
}
// MuxWriter an *os.File writer with locker.
type MuxWriter struct {
sync.Mutex
fd *os.File
}
// write to os.File.
func (l *MuxWriter) Write(b []byte) (int, error) {
l.Lock()
defer l.Unlock()
return l.fd.Write(b)
}
// SetFd set os.File in writer.
func (l *MuxWriter) SetFd(fd *os.File) {
if l.fd != nil {
_ = l.fd.Close()
}
l.fd = fd
}
// NewFileWriter create a FileLogWriter returning as LoggerInterface.
func NewFileWriter() LoggerInterface {
w := &LogWriter{
Filename: "",
Maxlines: 1000000,
Maxsize: 1 << 28, //256 MB
Daily: true,
Maxdays: 7,
Rotate: true,
Level: LevelDebug,
}
// use MuxWriter instead direct use os.File for lock write when rotate
w.mw = new(MuxWriter)
// set MuxWriter as Logger's io.Writer
w.Logger = log.New(w.mw, "", log.Ldate|log.Ltime)
return w
}
// Init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",
// "maxlines":10000,
// "maxsize":1<<30,
// "daily":true,
// "maxdays":15,
// "rotate":true
// }
func (w *LogWriter) Init(jsonconfig string) error {
err := json.Unmarshal([]byte(jsonconfig), w)
if err != nil {
return err
}
if len(w.Filename) == 0 {
return errors.New("jsonconfig must have filename")
}
err = w.startLogger()
return err
}
// start file logger. create log file and set to locker-inside file writer.
func (w *LogWriter) startLogger() error {
fd, err := w.createLogFile()
if err != nil {
return err
}
w.mw.SetFd(fd)
err = w.initFd()
if err != nil {
return err
}
return nil
}
func (w *LogWriter) docheck(size int) {
w.startLock.Lock()
defer w.startLock.Unlock()
if w.Rotate && ((w.Maxlines > 0 && w.maxlinesCurlines >= w.Maxlines) ||
(w.Maxsize > 0 && w.maxsizeCursize >= w.Maxsize) ||
(w.Daily && time.Now().Day() != w.dailyOpendate)) {
if err := w.DoRotate(); err != nil {
fmt.Fprintf(os.Stderr, "FileLogWriter(%q): %s\n", w.Filename, err)
return
}
}
w.maxlinesCurlines++
w.maxsizeCursize += size
}
// WriteMsg write logger message into file.
func (w *LogWriter) WriteMsg(msg string, level int) error {
if level > w.Level {
return nil
}
n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [T] "
w.docheck(n)
w.Logger.Print(msg)
return nil
}
func (w *LogWriter) createLogFile() (*os.File, error) {
// Open the log file
fd, err := os.OpenFile(w.Filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0660)
return fd, err
}
func (w *LogWriter) initFd() error {
fd := w.mw.fd
finfo, err := fd.Stat()
if err != nil {
return fmt.Errorf("get stat err: %s", err)
}
w.maxsizeCursize = int(finfo.Size())
w.dailyOpendate = time.Now().Day()
if finfo.Size() > 0 {
content, err := ioutil.ReadFile(w.Filename)
if err != nil {
return err
}
w.maxlinesCurlines = len(strings.Split(string(content), "\n"))
} else {
w.maxlinesCurlines = 0
}
return nil
}
// DoRotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *LogWriter) DoRotate() error {
_, err := os.Lstat(w.Filename)
if err == nil { // file exists
// Find the next available number
num := 1
fname := ""
for ; err == nil && num <= 999; num++ {
fname = w.Filename + fmt.Sprintf(".%s.%03d", time.Now().Format("2006-01-02"), num)
_, err = os.Lstat(fname)
}
// return error if the last file checked still existed
if err == nil {
return fmt.Errorf("Rotate: Cannot find free log number to rename %s", w.Filename)
}
// block Logger's io.Writer
w.mw.Lock()
defer w.mw.Unlock()
fd := w.mw.fd
_ = fd.Close()
// close fd before rename
// Rename the file to its newfound home
err = os.Rename(w.Filename, fname)
if err != nil {
return fmt.Errorf("Rotate: %s", err)
}
// re-start logger
err = w.startLogger()
if err != nil {
return fmt.Errorf("Rotate StartLogger: %s", err)
}
go w.deleteOldLog()
}
return nil
}
func (w *LogWriter) deleteOldLog() {
dir := filepath.Dir(w.Filename)
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) (returnErr error) {
defer func() {
if r := recover(); r != nil {
returnErr = fmt.Errorf("Unable to delete old log '%s', error: %+v", path, r)
fmt.Println(returnErr)
}
}()
if !info.IsDir() && info.ModTime().Unix() < (time.Now().Unix()-60*60*24*w.Maxdays) {
if strings.HasPrefix(filepath.Base(path), filepath.Base(w.Filename)) {
_ = os.Remove(path)
}
}
return
})
}
// Destroy destroy file logger, close file writer.
func (w *LogWriter) Destroy() {
_ = w.mw.fd.Close()
}
// Flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *LogWriter) Flush() {
_ = w.mw.fd.Sync()
}
补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook
logrus Hook 分析
logrus hook 接口定义很简单。如下
package logrus
// A hook to be fired when logging on the logging levels returned from
// `Levels()` on your implementation of the interface. Note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `Levels()` to block.
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
// Internal type for storing the hooks on a logger instance.
type LevelHooks map[Level][]Hook
// Add a hook to an instance of logger. This is called with
// `log.Hooks.Add(new(MyHook))` where `MyHook` implements the `Hook` interface.
func (hooks LevelHooks) Add(hook Hook) {
for _, level := range hook.Levels() {
hooks[level] = append(hooks[level], hook)
}
}
// Fire all the hooks for the passed level. Used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks LevelHooks) Fire(level Level, entry *Entry) error {
for _, hook := range hooks[level] {
if err := hook.Fire(entry); err != nil {
return err
}
}
return nil
}
只需实现 该结构的接口。
type Hook interface {
Levels() []Level
Fire(*Entry) error
}
就会被logrus框架遍历调用已注册的 hook 的 Fire 方法
获取日志实例
// log_hook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/util/constant"
"os"
)
//自实现 logrus hook
func getLogger(module string) *logrus.Logger {
//实例化
logger := logrus.New()
//设置输出
logger.Out = os.Stdout
//设置日志级别
logger.SetLevel(logrus.DebugLevel)
//设置日志格式
//自定writer就行, hook 交给 lfshook
logger.AddHook(newLogrusHook(constant.GetLogPath(), module))
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat:"2006-01-02 15:04:05",
})
return logger
}
//确保每次调用使用的文件都是唯一的。
func GetNewFieldLoggerContext(module,appField string) *logrus.Entry {
logger:= getLogger(module)
return logger.WithFields(logrus.Fields{
"app": appField,
})
}
//订阅 警告日志
func SubscribeLog(entry *logrus.Entry, subMap SubscribeMap) {
logger := entry.Logger
logger.AddHook(newSubScribeHook(subMap))
fmt.Println("日志订阅成功")
}
constant.GetLogPath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。
日志切片hook
代码
// writer.go
package logger
import (
"fmt"
"github.com/pkg/errors"
"io"
"library/util"
"os"
"path/filepath"
"sync"
"time"
)
type LogWriter struct {
logDir string //日志根目录地址。
module string //模块 名
curFileName string //当前被指定的filename
curBaseFileName string //在使用中的file
turnCateDuration time.Duration
mutex sync.RWMutex
outFh *os.File
}
func (w *LogWriter) Write(p []byte) (n int, err error) {
w.mutex.Lock()
defer w.mutex.Unlock()
if out, err:= w.getWriter(); err!=nil {
return 0, errors.New("failed to fetch target io.Writer")
}else{
return out.Write(p)
}
}
func (w *LogWriter) getFileName() string {
base := time.Now().Truncate(w.turnCateDuration)
return fmt.Sprintf("%s/%s/%s_%s", w.logDir, base.Format("2006-01-02"), w.module, base.Format("15"))
}
func (w *LogWriter) getWriter()(io.Writer, error) {
fileName := w.curBaseFileName
//判断是否有新的文件名
//会出现新的文件名
baseFileName := w.getFileName()
if baseFileName != fileName {
fileName = baseFileName
}
dirname := filepath.Dir(fileName)
if err := os.MkdirAll(dirname, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", dirname)
}
fileHandler, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Errorf("failed to open file %s", err)
}
w.outFh.Close()
w.outFh = fileHandler
w.curBaseFileName = fileName
w.curFileName = fileName
return fileHandler, nil
}
func New(logPath, module string, duration time.Duration) *LogWriter {
return &LogWriter{
logDir: logPath,
module: module,
turnCateDuration:duration,
curFileName: "",
curBaseFileName: "",
}
}
// hook.go
package logger
import (
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"time"
)
func newLogrusHook(logPath, moduel string) logrus.Hook {
logrus.SetLevel(logrus.WarnLevel)
writer := New(logPath, moduel, time.Hour * 2)
lfsHook := lfshook.NewHook(lfshook.WriterMap{
logrus.DebugLevel: writer,
logrus.InfoLevel: writer,
logrus.WarnLevel: writer,
logrus.ErrorLevel: writer,
logrus.FatalLevel: writer,
logrus.PanicLevel: writer,
}, &logrus.TextFormatter{DisableColors: true})
// writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
// writer 是实现了writer 接口的库,在日志调用write是做预处理
return lfsHook
}
测试代码
func TestGetLogger(t *testing.T) {
lg := GetNewFieldLoggerContext("test","d")
lg.Logger.Info("????")
}
解析
logger实例持有了 自定义的 io.writer 结构体,在消费Fire函数时,会调用Write方法,此时通过Truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。
注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。
邮件警报hook
代码
// subscribeHook.go
package logger
import (
"fmt"
"github.com/sirupsen/logrus"
"library/email"
"strings"
)
type SubscribeMap map[logrus.Level][]*email.Receiver
type SubscribeHook struct {
subMap SubscribeMap
}
//此处可以自实现hook 目前使用三方hook
func(h *SubscribeHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *SubscribeHook)Fire(entry *logrus.Entry) error{
for level, receivers := range h.subMap {
//命中 准备消费
if level == entry.Level {
if len(receivers) > 0 {
email.SendEmail(receivers, fmt.Sprintf("%s:[系统日志警报]", entry.Level.String()),
fmt.Sprintf("错误内容: %s",entry.Message))
}
}
}
return nil
}
func NewSubscribeMap(level logrus.Level, receiverStr string) SubscribeMap{
subMap := SubscribeMap{}
addressList := strings.Split(receiverStr,";")
var receivers []*email.Receiver
for _, address := range addressList {
receivers = append(receivers, &email.Receiver{Email: address})
}
subMap[level] = receivers
return subMap
}
func newSubScribeHook(subMap SubscribeMap) *SubscribeHook {
return &SubscribeHook{subMap}
// email.go
package email
import (
"fmt"
"gopkg.in/gomail.v2"
"regexp"
"strconv"
)
type Sender struct {
User string
Password string
Host string
Port int
MailTo []string
Subject string
Content string
}
type Receiver struct {
Email string
}
func (r *Receiver) Check() bool {
pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
reg := regexp.MustCompile(pattern)
return reg.MatchString(r.Email)
}
func (s *Sender) clean (){
}
//检查 邮箱正确性
func (s *Sender)NewReceiver(email string) *Receiver {
rec := &Receiver{Email:email}
if rec.Check() {
m.MailTo = []string{email}
return rec
}else{
fmt.Printf("email check fail 【%s】\n", email)
return nil
}
}
func (s *Sender)NewReceivers(receivers []*Receiver) {
for _, rec := range receivers {
if rec.Check() {
m.MailTo = append(m.MailTo, rec.Email)
}else{
fmt.Printf("email check fail 【%s】\n", rec.Email)
}
}
}
// 163邮箱 password 为开启smtp后给的秘钥
var m = Sender{User:"6666666@163.com", Password:"666666666", Host: "smtp.163.com", Port: 465}
func SendEmail(receivers []*Receiver,subject, content string){
m.NewReceivers(receivers)
m.Subject = subject
m.Content = content
e := gomail.NewMessage()
e.SetHeader("From", e.FormatAddress(m.User, "hengsheng"))
e.SetHeader("To", m.MailTo...) //发送给多个用户
e.SetHeader("Subject", m.Subject) //设置邮件主题
e.SetBody("text/html", m.Content) //设置邮件正文
d := gomail.NewDialer(m.Host, m.Port, m.User, m.Password)
err := d.DialAndSend(e)
if err != nil {
fmt.Printf("error 邮件发送错误! %s \n", err.Error())
}
}
使用
同理在writer时 如果是错误日志则发送邮件。
o.logger = logger.GetNewFieldLoggerContext("test", "666")
if subscribeSocket {
logger.SubscribeLog(o.Logger, logger.NewSubscribeMap(logrus.ErrorLevel, "a@163.com;b@163.com"))
}
// o 为实际结构体实例
kafkahook
// kafka hook
package logger
import (
"github.com/sirupsen/logrus"
"library/kafka"
"library/util/constant"
)
type KafKaHook struct {
kafkaProducer *kafka.KafkaProducer
}
func(h *KafKaHook)Levels() []logrus.Level{
return logrus.AllLevels
}
func(h *KafKaHook)Fire(entry *logrus.Entry) error{
h.kafkaProducer.SendMsgSync(entry.Message)
return nil
}
func newKafkaHook() *KafKaHook{
producer := kafka.NewKafkaProducer(constant.KafkaLogElkTopic,true)
return &KafKaHook{kafkaProducer: producer}
}
使用时logger.AddHook(newKafkaHook()) 即可
kafka模块
生产者
// kafkaProducer.go
package kafka
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
"library/util/constant"
"log"
"time"
)
func GetKafkaAddress()[]string{
return "127.0.0.1:9092"
}
//同步消息模式
func SyncProducer(topic, message string) error {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
if err != nil {
return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
}
defer p.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(message),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
} else {
fmt.Printf("发送成功,partition=%d, offset=%d \n", part, offset)
return nil
}
}
//async 异步生产者
type KafkaProducer struct {
topic string
asyncProducer *sarama.AsyncProducer
syncProducer *sarama.SyncProducer
sync bool
}
func NewKafkaProducer(topic string, sync bool) *KafkaProducer {
k := &KafkaProducer{
topic: topic,
sync: sync,
}
if sync {
k.initSync()
}else{
k.initAsync()
}
return k
}
func (k *KafkaProducer) initAsync() bool {
if k.sync {
fmt.Printf("sync producer cant call async func !\n")
return false
}
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
producer, e := sarama.NewAsyncProducer(GetKafkaAddress(), config)
if e != nil {
fmt.Println(e)
return false
}
k.asyncProducer = &producer
defer producer.AsyncClose()
pd := *k.asyncProducer
go func() {
for{
select {
case <-pd.Successes():
//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-pd.Errors():
fmt.Printf("err: %s \n", fail.Err.Error())
}
}
}()
return true
}
func (k *KafkaProducer) initSync() bool {
if !k.sync {
fmt.Println("async producer cant call sync func !")
return false
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(GetKafkaAddress(), config)
k.syncProducer = &p
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return false
}
return true
}
func (k *KafkaProducer) SendMsgAsync(sendStr string) {
msg := &sarama.ProducerMessage{
Topic: k.topic,
}
//将字符串转化为字节数组
msg.Value = sarama.ByteEncoder(sendStr)
//fmt.Println(value)
//使用通道发送
pd := *k.asyncProducer
pd.Input() <- msg
}
func (k *KafkaProducer) SendMsgSync(sendStr string) bool {
msg := &sarama.ProducerMessage{
Topic: k.topic,
Value: sarama.ByteEncoder(sendStr),
}
pd := *k.syncProducer
part, offset, err := pd.SendMessage(msg)
if err != nil {
fmt.Printf("发送失败 send message(%s) err=%s \n", sendStr, err)
return false
} else {
fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
return true
}
}
调用 SendMsgSync 或 SendMsgAsync 生产消息,注意初始化时的参数要保证一致!
消费者组
// kafkaConsumerGroup.go
package kafka
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
)
func NewKafkaConsumerGroup(topics []string, group string, businessCall func(message *sarama.ConsumerMessage) bool) *KafkaConsumerGroup {
k := &KafkaConsumerGroup{
brokers: GetKafkaAddress(),
topics: topics,
group: group,
channelBufferSize: 2,
ready: make(chan bool),
version: "1.1.1",
handler: businessCall,
}
k.Init()
return k
}
// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// Consumer 消费,但可以被多个 consumer group 消费
type KafkaConsumerGroup struct {
//代理(broker): 一台kafka服务器称之为一个broker
brokers []string
//主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
topics []string
version string
ready chan bool
group string
channelBufferSize int
//业务调用
handler func(message *sarama.ConsumerMessage) bool
}
func (k *KafkaConsumerGroup)Init() func() {
version,err := sarama.ParseKafkaVersion(k.version)
if err!=nil{
fmt.Printf("Error parsing Kafka version: %v", err)
}
cfg := sarama.NewConfig()
cfg.Version = version
// 分区分配策略
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
// 未找到组消费位移的时候从哪边开始消费
cfg.Consumer.Offsets.Initial = -2
// channel长度
cfg.ChannelBufferSize = k.channelBufferSize
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(k.brokers, k.group, cfg)
if err != nil {
fmt.Printf("Error creating consumer group client: %v", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
//util.HandlePanic("client.Consume panic", log.StandardLogger())
}()
for {
if err := client.Consume(ctx, k.topics, k); err != nil {
log.Printf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Println(ctx.Err())
return
}
k.ready = make(chan bool)
}
}()
<-k.ready
fmt.Printf("Sarama consumer up and running!... \n")
// 保证在系统退出时,通道里面的消息被消费
return func() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
fmt.Printf("Error closing client: %v \n", err)
}
}
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (k *KafkaConsumerGroup) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(k.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (k *KafkaConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (k *KafkaConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// 具体消费消息
for message := range claim.Messages() {
//msg := string(message.Value)
//k.logger.Infof("卡夫卡: %s", msg)
if ok:= k.handler(message); ok {
// 更新位移
session.MarkMessage(message, "")
}
//run.Run(msg)
}
return nil
}
测试代码
func TestKafkaConsumerGroup_Init(t *testing.T) {
//pd := NewKafkaProducer("test-fail",true)
//pd.InitSync()
k := NewKafkaConsumerGroup([]string{constant.KafkaALiSdkTopic}, "group-2", func(message *sarama.ConsumerMessage) bool {
fmt.Println(string(message.Value))
//如果失败的处理逻辑
//if ok := pd.SendMsgSync("666666"); ok {
// return true
//}
return false
})
consumerDone := k.Init()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Println("terminating: via signal")
}
consumerDone()
}
这里有一些补偿逻辑在里面。
来源:https://noshoes.blog.csdn.net/article/details/82909121
猜你喜欢
- 本文实例讲述了Sanic框架应用部署方法。分享给大家供大家参考,具体如下:简介Sanic是一个类似Flask的Python 3.5+ Web
- urllib中的异常处理在我们写爬虫程序时,若出现url中的错误,那么我们就无法爬取我们想要的内容,对此,我们引入了urllib中的异常处理
- python天数转日期字符串在利用python读取Excel的时候, 日期格式的单元格读取出来是数字,该数字表示1990年01月01日到该日
- collections是Python内建的一个集合模块,提供了许多有用的集合类。这里举几个例子:namedtuple我们知道tuple可以表
- IDLE 3.0 >>> dic = {"aa":1,"bb":2,"a
- 一、 前提:有Google账号(具体怎么注册账号这里不详述,大家都懂的,自行百度)在你的Google邮箱中关联好colab(怎样在Googl
- 场景分析一般Linux系统默认自带两个版本的python,我按照的虚拟机系统自带的版本为python2.7和python3.2,但是由于个人
- 1.gorm介绍1.1介绍全功能 ORM关联 (Has One,Has Many,Belongs To,Many To Many,多态,单表
- 前言众所周知在Python 中常用的数据类型bool(布尔)类型的实例对象(值)就两个,真和假,分别用True和False表示。在if 条件
- 实验目的:用户输入网卡名称,通过函数返回对应的IPv4和IPv6地址。实验代码:步骤一: 由于window系统下网卡名称并不是真正的名字,而
- 内容摘要: 网页的色彩搭配往往是网友们感到头疼的问题,尤其是那些完全没有美术基础的网友。到底用
- xml即可扩展标记语言,它可以用来标记数据、定义数据类型,是一种允许用户对自己的标记语言进行定义的源语言。从结构上,很像HTML超文本标记语
- 今天接到测试人员反应,测试环境前端应用程序无连接mysql数据库,登录mysql服务器,查看错误日志,发现有如下报错:ERROR 1135
- 本文实例为大家分享了python实现端口扫描的具体代码,供大家参考,具体内容如下今天老师上课说的内容,使用多线程+socket写一个端口扫描
- IIS上设置301 跳转相信大家都会,只要在网站-属性-主目录里 选择重定向URL就行了,这样整站就跳转到目标站点了,但是有个问
- 在使用Sublime Text3 的时候导numpy的包发现报错,找不到这个包,这是因为要配置pip源才能正常导包,进行from numpy
- 第1章 ansible软件概念说明python语言是运维人员必会的语言,而ansible是一个基于Python开发的自动化运维工具 (sal
- 每个电子商务数据分析师必须掌握的一项数据聚类技能如果你是一名在电子商务公司工作的数据分析师,从客户数据中挖掘潜在价值,来提高客户留存率很可能
- 下午在写程序的时候,碰到个变量重定义的问题,具体是在一个函数中的两个地方定义了相同的变量,两个变量分别放在IF语句的两部分中,本来以为这两次
- 前言最近工作中遇到一个需求,是根据用户连续记录天数来计算的,求出用户在一段时间内最大的连续记录时间,例如在 2016-01-01 和 201