blob: 413cfd6f7b734f1c50fb34e5d5274ceb525a9d42 [file] [log] [blame]
From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001
From: Brian Goff <cpuguy83@gmail.com>
Date: Mon, 11 Jul 2016 16:31:42 -0400
Subject: [PATCH] Fix issues with tailing rotated jsonlog file
Fixes a race where the log reader would get events for both an actual
rotation as we from fsnotify (`fsnotify.Rename`).
This issue becomes extremely apparent when rotations are fast, for
example:
```
$ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2
busybox sh -c 'while true; do echo hello; usleep 100000; done'
```
With this change the log reader for jsonlogs can handle rotations that
happen as above.
Instead of listening for both fs events AND rotation events
simultaneously, potentially meaning we see 2 rotations for only a single
rotation due to channel buffering, only listen for fs events (like
`Rename`) and then wait to be notified about rotation by the logger.
This makes sure that we don't see 2 rotations for 1, and that we don't
start trying to read until the logger is actually ready for us to.
Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is pending upstream commit fixing broken log tailing. The
original commit can be found in the PR here:
- https://github.com/docker/docker/pull/24514
Signed-off-by: Christian Stewart <christian@paral.in>
---
daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++-------------
1 file changed, 119 insertions(+), 61 deletions(-)
diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index bea83dd..0cb44af 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -3,11 +3,14 @@ package jsonfilelog
import (
"bytes"
"encoding/json"
+ "errors"
"fmt"
"io"
"os"
"time"
+ "gopkg.in/fsnotify.v1"
+
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
@@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)
+ // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read
+ // This will block writes!!!
+ l.mu.Lock()
+
pth := l.writer.LogPath()
var files []io.ReadSeeker
for i := l.writer.MaxFiles(); i > 1; i-- {
@@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
+ l.mu.Unlock()
return
}
@@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
if err := latestFile.Close(); err != nil {
logrus.Errorf("Error closing file: %v", err)
}
+ l.mu.Unlock()
return
}
@@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
latestFile.Seek(0, os.SEEK_END)
}
- l.mu.Lock()
l.readers[logWatcher] = struct{}{}
l.mu.Unlock()
@@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
}
}
+func watchFile(name string) (filenotify.FileWatcher, error) {
+ fileWatcher, err := filenotify.New()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := fileWatcher.Add(name); err != nil {
+ logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
+ fileWatcher.Close()
+ fileWatcher = filenotify.NewPollingWatcher()
+
+ if err := fileWatcher.Add(name); err != nil {
+ fileWatcher.Close()
+ logrus.Debugf("error watching log file for modifications: %v", err)
+ return nil, err
+ }
+ }
+ return fileWatcher, nil
+}
+
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}
- fileWatcher, err := filenotify.New()
+ name := f.Name()
+ fileWatcher, err := watchFile(name)
if err != nil {
logWatcher.Err <- err
+ return
}
defer func() {
f.Close()
fileWatcher.Close()
}()
- name := f.Name()
- if err := fileWatcher.Add(name); err != nil {
- logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
- fileWatcher.Close()
- fileWatcher = filenotify.NewPollingWatcher()
+ var retries int
+ handleRotate := func() error {
+ f.Close()
+ fileWatcher.Remove(name)
+ // retry when the file doesn't exist
+ for retries := 0; retries <= 5; retries++ {
+ f, err = os.Open(name)
+ if err == nil || !os.IsNotExist(err) {
+ break
+ }
+ }
+ if err != nil {
+ return err
+ }
if err := fileWatcher.Add(name); err != nil {
- logrus.Debugf("error watching log file for modifications: %v", err)
- logWatcher.Err <- err
- return
+ return err
}
+ dec = json.NewDecoder(f)
+ return nil
}
- var retries int
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- if err != io.EOF {
- // try again because this shouldn't happen
- if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
- dec = json.NewDecoder(f)
- retries++
- continue
+ errRetry := errors.New("retry")
+ errDone := errors.New("done")
+ waitRead := func() error {
+ select {
+ case e := <-fileWatcher.Events():
+ switch e.Op {
+ case fsnotify.Write:
+ dec = json.NewDecoder(f)
+ return nil
+ case fsnotify.Rename, fsnotify.Remove:
+ <-notifyRotate
+ if err := handleRotate(); err != nil {
+ return err
}
-
- // io.ErrUnexpectedEOF is returned from json.Decoder when there is
- // remaining data in the parser's buffer while an io.EOF occurs.
- // If the json logger writes a partial json log entry to the disk
- // while at the same time the decoder tries to decode it, the race condition happens.
- if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
- reader := io.MultiReader(dec.Buffered(), f)
- dec = json.NewDecoder(reader)
- retries++
- continue
+ return nil
+ }
+ return errRetry
+ case err := <-fileWatcher.Errors():
+ logrus.Debug("logger got error watching file: %v", err)
+ // Something happened, let's try and stay alive and create a new watcher
+ if retries <= 5 {
+ fileWatcher, err = watchFile(name)
+ if err != nil {
+ return err
}
-
- return
+ retries++
+ return errRetry
}
+ return err
+ case <-logWatcher.WatchClose():
+ fileWatcher.Remove(name)
+ return errDone
+ }
+ }
- select {
- case <-fileWatcher.Events():
- dec = json.NewDecoder(f)
- continue
- case <-fileWatcher.Errors():
- logWatcher.Err <- err
- return
- case <-logWatcher.WatchClose():
- fileWatcher.Remove(name)
- return
- case <-notifyRotate:
- f.Close()
- fileWatcher.Remove(name)
-
- // retry when the file doesn't exist
- for retries := 0; retries <= 5; retries++ {
- f, err = os.Open(name)
- if err == nil || !os.IsNotExist(err) {
- break
- }
+ handleDecodeErr := func(err error) error {
+ if err == io.EOF {
+ for err := waitRead(); err != nil; {
+ if err == errRetry {
+ // retry the waitRead
+ continue
}
+ return err
+ }
+ return nil
+ }
+ // try again because this shouldn't happen
+ if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
+ dec = json.NewDecoder(f)
+ retries++
+ return nil
+ }
+ // io.ErrUnexpectedEOF is returned from json.Decoder when there is
+ // remaining data in the parser's buffer while an io.EOF occurs.
+ // If the json logger writes a partial json log entry to the disk
+ // while at the same time the decoder tries to decode it, the race condition happens.
+ if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
+ reader := io.MultiReader(dec.Buffered(), f)
+ dec = json.NewDecoder(reader)
+ retries++
+ return nil
+ }
+ return err
+ }
- if err = fileWatcher.Add(name); err != nil {
- logWatcher.Err <- err
- return
- }
- if err != nil {
- logWatcher.Err <- err
+ // main loop
+ for {
+ msg, err := decodeLogLine(dec, l)
+ if err != nil {
+ if err := handleDecodeErr(err); err != nil {
+ if err == errDone {
return
}
-
- dec = json.NewDecoder(f)
- continue
+ // we got an unrecoverable error, so return
+ logWatcher.Err <- err
+ return
}
+ // ready to try again
+ continue
}
retries = 0 // reset retries since we've succeeded
--
2.7.3