| From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001 |
| From: Brian Goff <cpuguy83@gmail.com> |
| Date: Mon, 11 Jul 2016 16:31:42 -0400 |
| Subject: [PATCH] Fix issues with tailing rotated jsonlog file |
| |
| Fixes a race where the log reader would get events for both an actual |
| rotation as we from fsnotify (`fsnotify.Rename`). |
| This issue becomes extremely apparent when rotations are fast, for |
| example: |
| |
| ``` |
| $ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2 |
| busybox sh -c 'while true; do echo hello; usleep 100000; done' |
| ``` |
| |
| With this change the log reader for jsonlogs can handle rotations that |
| happen as above. |
| |
| Instead of listening for both fs events AND rotation events |
| simultaneously, potentially meaning we see 2 rotations for only a single |
| rotation due to channel buffering, only listen for fs events (like |
| `Rename`) and then wait to be notified about rotation by the logger. |
| This makes sure that we don't see 2 rotations for 1, and that we don't |
| start trying to read until the logger is actually ready for us to. |
| |
| Signed-off-by: Brian Goff <cpuguy83@gmail.com> |
| |
| This commit is pending upstream commit fixing broken log tailing. The |
| original commit can be found in the PR here: |
| |
| - https://github.com/docker/docker/pull/24514 |
| |
| Signed-off-by: Christian Stewart <christian@paral.in> |
| --- |
| daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++------------- |
| 1 file changed, 119 insertions(+), 61 deletions(-) |
| |
| diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go |
| index bea83dd..0cb44af 100644 |
| --- a/daemon/logger/jsonfilelog/read.go |
| +++ b/daemon/logger/jsonfilelog/read.go |
| @@ -3,11 +3,14 @@ package jsonfilelog |
| import ( |
| "bytes" |
| "encoding/json" |
| + "errors" |
| "fmt" |
| "io" |
| "os" |
| "time" |
| |
| + "gopkg.in/fsnotify.v1" |
| + |
| "github.com/Sirupsen/logrus" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/pkg/filenotify" |
| @@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { |
| func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { |
| defer close(logWatcher.Msg) |
| |
| + // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read |
| + // This will block writes!!! |
| + l.mu.Lock() |
| + |
| pth := l.writer.LogPath() |
| var files []io.ReadSeeker |
| for i := l.writer.MaxFiles(); i > 1; i-- { |
| @@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R |
| latestFile, err := os.Open(pth) |
| if err != nil { |
| logWatcher.Err <- err |
| + l.mu.Unlock() |
| return |
| } |
| |
| @@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R |
| if err := latestFile.Close(); err != nil { |
| logrus.Errorf("Error closing file: %v", err) |
| } |
| + l.mu.Unlock() |
| return |
| } |
| |
| @@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R |
| latestFile.Seek(0, os.SEEK_END) |
| } |
| |
| - l.mu.Lock() |
| l.readers[logWatcher] = struct{}{} |
| l.mu.Unlock() |
| |
| @@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti |
| } |
| } |
| |
| +func watchFile(name string) (filenotify.FileWatcher, error) { |
| + fileWatcher, err := filenotify.New() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + if err := fileWatcher.Add(name); err != nil { |
| + logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err) |
| + fileWatcher.Close() |
| + fileWatcher = filenotify.NewPollingWatcher() |
| + |
| + if err := fileWatcher.Add(name); err != nil { |
| + fileWatcher.Close() |
| + logrus.Debugf("error watching log file for modifications: %v", err) |
| + return nil, err |
| + } |
| + } |
| + return fileWatcher, nil |
| +} |
| + |
| func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { |
| dec := json.NewDecoder(f) |
| l := &jsonlog.JSONLog{} |
| |
| - fileWatcher, err := filenotify.New() |
| + name := f.Name() |
| + fileWatcher, err := watchFile(name) |
| if err != nil { |
| logWatcher.Err <- err |
| + return |
| } |
| defer func() { |
| f.Close() |
| fileWatcher.Close() |
| }() |
| - name := f.Name() |
| |
| - if err := fileWatcher.Add(name); err != nil { |
| - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err) |
| - fileWatcher.Close() |
| - fileWatcher = filenotify.NewPollingWatcher() |
| + var retries int |
| + handleRotate := func() error { |
| + f.Close() |
| + fileWatcher.Remove(name) |
| |
| + // retry when the file doesn't exist |
| + for retries := 0; retries <= 5; retries++ { |
| + f, err = os.Open(name) |
| + if err == nil || !os.IsNotExist(err) { |
| + break |
| + } |
| + } |
| + if err != nil { |
| + return err |
| + } |
| if err := fileWatcher.Add(name); err != nil { |
| - logrus.Debugf("error watching log file for modifications: %v", err) |
| - logWatcher.Err <- err |
| - return |
| + return err |
| } |
| + dec = json.NewDecoder(f) |
| + return nil |
| } |
| |
| - var retries int |
| - for { |
| - msg, err := decodeLogLine(dec, l) |
| - if err != nil { |
| - if err != io.EOF { |
| - // try again because this shouldn't happen |
| - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { |
| - dec = json.NewDecoder(f) |
| - retries++ |
| - continue |
| + errRetry := errors.New("retry") |
| + errDone := errors.New("done") |
| + waitRead := func() error { |
| + select { |
| + case e := <-fileWatcher.Events(): |
| + switch e.Op { |
| + case fsnotify.Write: |
| + dec = json.NewDecoder(f) |
| + return nil |
| + case fsnotify.Rename, fsnotify.Remove: |
| + <-notifyRotate |
| + if err := handleRotate(); err != nil { |
| + return err |
| } |
| - |
| - // io.ErrUnexpectedEOF is returned from json.Decoder when there is |
| - // remaining data in the parser's buffer while an io.EOF occurs. |
| - // If the json logger writes a partial json log entry to the disk |
| - // while at the same time the decoder tries to decode it, the race condition happens. |
| - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { |
| - reader := io.MultiReader(dec.Buffered(), f) |
| - dec = json.NewDecoder(reader) |
| - retries++ |
| - continue |
| + return nil |
| + } |
| + return errRetry |
| + case err := <-fileWatcher.Errors(): |
| + logrus.Debug("logger got error watching file: %v", err) |
| + // Something happened, let's try and stay alive and create a new watcher |
| + if retries <= 5 { |
| + fileWatcher, err = watchFile(name) |
| + if err != nil { |
| + return err |
| } |
| - |
| - return |
| + retries++ |
| + return errRetry |
| } |
| + return err |
| + case <-logWatcher.WatchClose(): |
| + fileWatcher.Remove(name) |
| + return errDone |
| + } |
| + } |
| |
| - select { |
| - case <-fileWatcher.Events(): |
| - dec = json.NewDecoder(f) |
| - continue |
| - case <-fileWatcher.Errors(): |
| - logWatcher.Err <- err |
| - return |
| - case <-logWatcher.WatchClose(): |
| - fileWatcher.Remove(name) |
| - return |
| - case <-notifyRotate: |
| - f.Close() |
| - fileWatcher.Remove(name) |
| - |
| - // retry when the file doesn't exist |
| - for retries := 0; retries <= 5; retries++ { |
| - f, err = os.Open(name) |
| - if err == nil || !os.IsNotExist(err) { |
| - break |
| - } |
| + handleDecodeErr := func(err error) error { |
| + if err == io.EOF { |
| + for err := waitRead(); err != nil; { |
| + if err == errRetry { |
| + // retry the waitRead |
| + continue |
| } |
| + return err |
| + } |
| + return nil |
| + } |
| + // try again because this shouldn't happen |
| + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { |
| + dec = json.NewDecoder(f) |
| + retries++ |
| + return nil |
| + } |
| + // io.ErrUnexpectedEOF is returned from json.Decoder when there is |
| + // remaining data in the parser's buffer while an io.EOF occurs. |
| + // If the json logger writes a partial json log entry to the disk |
| + // while at the same time the decoder tries to decode it, the race condition happens. |
| + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry { |
| + reader := io.MultiReader(dec.Buffered(), f) |
| + dec = json.NewDecoder(reader) |
| + retries++ |
| + return nil |
| + } |
| + return err |
| + } |
| |
| - if err = fileWatcher.Add(name); err != nil { |
| - logWatcher.Err <- err |
| - return |
| - } |
| - if err != nil { |
| - logWatcher.Err <- err |
| + // main loop |
| + for { |
| + msg, err := decodeLogLine(dec, l) |
| + if err != nil { |
| + if err := handleDecodeErr(err); err != nil { |
| + if err == errDone { |
| return |
| } |
| - |
| - dec = json.NewDecoder(f) |
| - continue |
| + // we got an unrecoverable error, so return |
| + logWatcher.Err <- err |
| + return |
| } |
| + // ready to try again |
| + continue |
| } |
| |
| retries = 0 // reset retries since we've succeeded |
| -- |
| 2.7.3 |
| |