0001-Fix-issues-with-tailing-rotated-jsonlog-file.patch 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001
  2. From: Brian Goff <cpuguy83@gmail.com>
  3. Date: Mon, 11 Jul 2016 16:31:42 -0400
  4. Subject: [PATCH] Fix issues with tailing rotated jsonlog file
  5. Fixes a race where the log reader would get events for both an actual
  6. rotation as we from fsnotify (`fsnotify.Rename`).
  7. This issue becomes extremely apparent when rotations are fast, for
  8. example:
  9. ```
  10. $ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2
  11. busybox sh -c 'while true; do echo hello; usleep 100000; done'
  12. ```
  13. With this change the log reader for jsonlogs can handle rotations that
  14. happen as above.
  15. Instead of listening for both fs events AND rotation events
  16. simultaneously, potentially meaning we see 2 rotations for only a single
  17. rotation due to channel buffering, only listen for fs events (like
  18. `Rename`) and then wait to be notified about rotation by the logger.
  19. This makes sure that we don't see 2 rotations for 1, and that we don't
  20. start trying to read until the logger is actually ready for us to.
  21. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
  22. This commit is pending upstream commit fixing broken log tailing. The
  23. original commit can be found in the PR here:
  24. - https://github.com/docker/docker/pull/24514
  25. Signed-off-by: Christian Stewart <christian@paral.in>
  26. ---
  27. daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++-------------
  28. 1 file changed, 119 insertions(+), 61 deletions(-)
  29. diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
  30. index bea83dd..0cb44af 100644
  31. --- a/daemon/logger/jsonfilelog/read.go
  32. +++ b/daemon/logger/jsonfilelog/read.go
  33. @@ -3,11 +3,14 @@ package jsonfilelog
  34. import (
  35. "bytes"
  36. "encoding/json"
  37. + "errors"
  38. "fmt"
  39. "io"
  40. "os"
  41. "time"
  42. + "gopkg.in/fsnotify.v1"
  43. +
  44. "github.com/Sirupsen/logrus"
  45. "github.com/docker/docker/daemon/logger"
  46. "github.com/docker/docker/pkg/filenotify"
  47. @@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
  48. func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
  49. defer close(logWatcher.Msg)
  50. + // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read
  51. + // This will block writes!!!
  52. + l.mu.Lock()
  53. +
  54. pth := l.writer.LogPath()
  55. var files []io.ReadSeeker
  56. for i := l.writer.MaxFiles(); i > 1; i-- {
  57. @@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
  58. latestFile, err := os.Open(pth)
  59. if err != nil {
  60. logWatcher.Err <- err
  61. + l.mu.Unlock()
  62. return
  63. }
  64. @@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
  65. if err := latestFile.Close(); err != nil {
  66. logrus.Errorf("Error closing file: %v", err)
  67. }
  68. + l.mu.Unlock()
  69. return
  70. }
  71. @@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
  72. latestFile.Seek(0, os.SEEK_END)
  73. }
  74. - l.mu.Lock()
  75. l.readers[logWatcher] = struct{}{}
  76. l.mu.Unlock()
  77. @@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
  78. }
  79. }
  80. +func watchFile(name string) (filenotify.FileWatcher, error) {
  81. + fileWatcher, err := filenotify.New()
  82. + if err != nil {
  83. + return nil, err
  84. + }
  85. +
  86. + if err := fileWatcher.Add(name); err != nil {
  87. + logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
  88. + fileWatcher.Close()
  89. + fileWatcher = filenotify.NewPollingWatcher()
  90. +
  91. + if err := fileWatcher.Add(name); err != nil {
  92. + fileWatcher.Close()
  93. + logrus.Debugf("error watching log file for modifications: %v", err)
  94. + return nil, err
  95. + }
  96. + }
  97. + return fileWatcher, nil
  98. +}
  99. +
  100. func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
  101. dec := json.NewDecoder(f)
  102. l := &jsonlog.JSONLog{}
  103. - fileWatcher, err := filenotify.New()
  104. + name := f.Name()
  105. + fileWatcher, err := watchFile(name)
  106. if err != nil {
  107. logWatcher.Err <- err
  108. + return
  109. }
  110. defer func() {
  111. f.Close()
  112. fileWatcher.Close()
  113. }()
  114. - name := f.Name()
  115. - if err := fileWatcher.Add(name); err != nil {
  116. - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
  117. - fileWatcher.Close()
  118. - fileWatcher = filenotify.NewPollingWatcher()
  119. + var retries int
  120. + handleRotate := func() error {
  121. + f.Close()
  122. + fileWatcher.Remove(name)
  123. + // retry when the file doesn't exist
  124. + for retries := 0; retries <= 5; retries++ {
  125. + f, err = os.Open(name)
  126. + if err == nil || !os.IsNotExist(err) {
  127. + break
  128. + }
  129. + }
  130. + if err != nil {
  131. + return err
  132. + }
  133. if err := fileWatcher.Add(name); err != nil {
  134. - logrus.Debugf("error watching log file for modifications: %v", err)
  135. - logWatcher.Err <- err
  136. - return
  137. + return err
  138. }
  139. + dec = json.NewDecoder(f)
  140. + return nil
  141. }
  142. - var retries int
  143. - for {
  144. - msg, err := decodeLogLine(dec, l)
  145. - if err != nil {
  146. - if err != io.EOF {
  147. - // try again because this shouldn't happen
  148. - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  149. - dec = json.NewDecoder(f)
  150. - retries++
  151. - continue
  152. + errRetry := errors.New("retry")
  153. + errDone := errors.New("done")
  154. + waitRead := func() error {
  155. + select {
  156. + case e := <-fileWatcher.Events():
  157. + switch e.Op {
  158. + case fsnotify.Write:
  159. + dec = json.NewDecoder(f)
  160. + return nil
  161. + case fsnotify.Rename, fsnotify.Remove:
  162. + <-notifyRotate
  163. + if err := handleRotate(); err != nil {
  164. + return err
  165. }
  166. -
  167. - // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  168. - // remaining data in the parser's buffer while an io.EOF occurs.
  169. - // If the json logger writes a partial json log entry to the disk
  170. - // while at the same time the decoder tries to decode it, the race condition happens.
  171. - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
  172. - reader := io.MultiReader(dec.Buffered(), f)
  173. - dec = json.NewDecoder(reader)
  174. - retries++
  175. - continue
  176. + return nil
  177. + }
  178. + return errRetry
  179. + case err := <-fileWatcher.Errors():
  180. + logrus.Debug("logger got error watching file: %v", err)
  181. + // Something happened, let's try and stay alive and create a new watcher
  182. + if retries <= 5 {
  183. + fileWatcher, err = watchFile(name)
  184. + if err != nil {
  185. + return err
  186. }
  187. -
  188. - return
  189. + retries++
  190. + return errRetry
  191. }
  192. + return err
  193. + case <-logWatcher.WatchClose():
  194. + fileWatcher.Remove(name)
  195. + return errDone
  196. + }
  197. + }
  198. - select {
  199. - case <-fileWatcher.Events():
  200. - dec = json.NewDecoder(f)
  201. - continue
  202. - case <-fileWatcher.Errors():
  203. - logWatcher.Err <- err
  204. - return
  205. - case <-logWatcher.WatchClose():
  206. - fileWatcher.Remove(name)
  207. - return
  208. - case <-notifyRotate:
  209. - f.Close()
  210. - fileWatcher.Remove(name)
  211. -
  212. - // retry when the file doesn't exist
  213. - for retries := 0; retries <= 5; retries++ {
  214. - f, err = os.Open(name)
  215. - if err == nil || !os.IsNotExist(err) {
  216. - break
  217. - }
  218. + handleDecodeErr := func(err error) error {
  219. + if err == io.EOF {
  220. + for err := waitRead(); err != nil; {
  221. + if err == errRetry {
  222. + // retry the waitRead
  223. + continue
  224. }
  225. + return err
  226. + }
  227. + return nil
  228. + }
  229. + // try again because this shouldn't happen
  230. + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
  231. + dec = json.NewDecoder(f)
  232. + retries++
  233. + return nil
  234. + }
  235. + // io.ErrUnexpectedEOF is returned from json.Decoder when there is
  236. + // remaining data in the parser's buffer while an io.EOF occurs.
  237. + // If the json logger writes a partial json log entry to the disk
  238. + // while at the same time the decoder tries to decode it, the race condition happens.
  239. + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
  240. + reader := io.MultiReader(dec.Buffered(), f)
  241. + dec = json.NewDecoder(reader)
  242. + retries++
  243. + return nil
  244. + }
  245. + return err
  246. + }
  247. - if err = fileWatcher.Add(name); err != nil {
  248. - logWatcher.Err <- err
  249. - return
  250. - }
  251. - if err != nil {
  252. - logWatcher.Err <- err
  253. + // main loop
  254. + for {
  255. + msg, err := decodeLogLine(dec, l)
  256. + if err != nil {
  257. + if err := handleDecodeErr(err); err != nil {
  258. + if err == errDone {
  259. return
  260. }
  261. -
  262. - dec = json.NewDecoder(f)
  263. - continue
  264. + // we got an unrecoverable error, so return
  265. + logWatcher.Err <- err
  266. + return
  267. }
  268. + // ready to try again
  269. + continue
  270. }
  271. retries = 0 // reset retries since we've succeeded
  272. --
  273. 2.7.3