diff --git a/livelog/list.go b/livelog/list.go deleted file mode 100644 index 90b5ee1..0000000 --- a/livelog/list.go +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2022 Drone.IO Inc. All rights reserved. -// Use of this source code is governed by the Polyform License -// that can be found in the LICENSE file. - -// Package livelog provides a Writer that collects pipeline -// output and streams to the central server. - -package livelog - -import ( - "sync" - "time" - - "github.com/drone/drone-go/drone" -) - -type node struct { - drone.Line - next *node -} - -type list struct { - sync.Mutex - - lineCnt int - lineNow time.Time - - size int - limit int - - last *node - - history *node - historyCount int - - pending *node - pendingCount int -} - -func makeList(limit int) *list { - return &list{ - lineCnt: 0, - lineNow: time.Now(), - limit: limit, - } -} - -func (l *list) SetLimit(limit int) { - l.Lock() - l.limit = limit - l.Unlock() -} - -func (l *list) GetLimit() int { - l.Lock() - limit := l.limit - l.Unlock() - return limit -} - -func (l *list) GetSize() int { - l.Lock() - size := l.size - l.Unlock() - return size -} - -func (l *list) Push(p []byte) (overflow bool) { - l.Lock() - for _, part := range split(p) { - line := drone.Line{ - Number: l.lineCnt, - Message: part, - Timestamp: int64(time.Since(l.lineNow).Seconds()), - } - - l.lineNow = time.Now() - l.lineCnt++ - - overflow = overflow || l.push(line) - } - l.Unlock() - - return overflow -} - -func (l *list) push(line drone.Line) bool { - n := &node{ - Line: line, - next: nil, - } - - // put the element to list - - l.size += len(line.Message) - - if l.last != nil { - l.last.next = n - } - l.last = n - - if l.history == nil { - l.history = n - } - l.historyCount++ - - if l.pending == nil { - l.pending = n - } - l.pendingCount++ - - // overflow check - - var overflow bool - - for l.size > l.limit && l.history != nil { - drop := l.history - next := drop.next - - if l.pending == drop { - l.pending = next - l.pendingCount-- - } - - l.history = next - l.historyCount-- - - if l.history == nil { - l.last = nil - } - - l.size -= len(drop.Line.Message) - - overflow = true - } - - return overflow -} - -func (l *list) peekPending() (lines []*drone.Line) { - l.Lock() - lines = toSlice(l.pendingCount, l.pending) - l.Unlock() - return -} - -// Pending returns lines added since the previous call to this method. -func (l *list) Pending() (lines []*drone.Line) { - l.Lock() - lines = toSlice(l.pendingCount, l.pending) - l.pending = nil - l.pendingCount = 0 - l.Unlock() - return -} - -func (l *list) peekHistory() (lines []*drone.Line) { - l.Lock() - lines = toSlice(l.historyCount, l.history) - l.Unlock() - return lines -} - -// History returns full history stored in the buffer and clears the buffer. -func (l *list) History() (lines []*drone.Line) { - l.Lock() - lines = toSlice(l.historyCount, l.history) - l.history = nil - l.historyCount = 0 - l.pending = nil - l.pendingCount = 0 - l.last = nil - l.size = 0 - l.Unlock() - return lines -} - -func toSlice(count int, head *node) []*drone.Line { - if count == 0 { - return nil - } - - lines := make([]*drone.Line, count) - for i, n := 0, head; n != nil; i, n = i+1, n.next { - lines[i] = &n.Line - } - - return lines -} diff --git a/livelog/livelog.go b/livelog/livelog.go index 895c84d..8c42fd3 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -9,62 +9,55 @@ package livelog import ( "context" "strings" + "sync" "time" + "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" ) // defaultLimit is the default maximum log size in bytes. const defaultLimit = 5242880 // 5MB -// Writer is an io.WriteCloser that sends logs to the server. +// Writer is an io.Writer that sends logs to the server. type Writer struct { + sync.Mutex + client client.Client - id int64 + id int64 + num int + now time.Time + size int + limit int interval time.Duration - lineList *list + pending []*drone.Line + history []*drone.Line - stopStreamFn func() - doneStream <-chan struct{} - ready chan struct{} + closed bool + close chan struct{} + ready chan struct{} } -// New returns a new Writer. +// New returns a new Wrtier. func New(client client.Client, id int64) *Writer { - streamCtx, stopStream := context.WithCancel(context.Background()) - b := &Writer{ - client: client, - id: id, - interval: time.Second, - lineList: makeList(defaultLimit), - stopStreamFn: stopStream, - doneStream: streamCtx.Done(), - ready: make(chan struct{}, 1), + client: client, + id: id, + now: time.Now(), + limit: defaultLimit, + interval: time.Second, + close: make(chan struct{}), + ready: make(chan struct{}, 1), } - - // a call to stopStream() stops this goroutine. - // this happens when the Close method is called or after overflow of output data (>limit). go b.start() - return b } // SetLimit sets the Writer limit. func (b *Writer) SetLimit(limit int) { - b.lineList.SetLimit(limit) -} - -// GetLimit returns the Writer limit. -func (b *Writer) GetLimit() int { - return b.lineList.GetLimit() -} - -// GetSize returns amount of output data the Writer currently holds. -func (b *Writer) GetSize() int { - return b.lineList.GetSize() + b.limit = limit } // SetInterval sets the Writer flusher interval. @@ -74,8 +67,31 @@ func (b *Writer) SetInterval(interval time.Duration) { // Write uploads the live log stream to the server. func (b *Writer) Write(p []byte) (n int, err error) { - if isOverLimit := b.lineList.Push(p); isOverLimit { - b.stopStreamFn() + for _, part := range split(p) { + line := &drone.Line{ + Number: b.num, + Message: part, + Timestamp: int64(time.Since(b.now).Seconds()), + } + + for b.size+len(p) > b.limit { + b.stop() // buffer is full, step streaming data + b.size -= len(b.history[0].Message) + b.history = b.history[1:] + } + + b.size = b.size + len(part) + b.num++ + + if b.stopped() == false { + b.Lock() + b.pending = append(b.pending, line) + b.Unlock() + } + + b.Lock() + b.history = append(b.history, line) + b.Unlock() } select { @@ -86,48 +102,78 @@ func (b *Writer) Write(p []byte) (n int, err error) { return len(p), nil } -// Close closes the writer and uploads the full contents to the server. +// Close closes the writer and uploads the full contents to +// the server. func (b *Writer) Close() error { - select { - case <-b.doneStream: - default: - b.stopStreamFn() - _ = b.flush() // send all pending lines + if b.stop() { + b.flush() } - - return b.upload() // upload full log history + return b.upload() } // upload uploads the full log history to the server. func (b *Writer) upload() error { - return b.client.Upload(context.Background(), b.id, b.lineList.History()) + return b.client.Upload( + context.Background(), b.id, b.history) } // flush batch uploads all buffered logs to the server. func (b *Writer) flush() error { - lines := b.lineList.Pending() + b.Lock() + lines := b.copy() + b.clear() + b.Unlock() if len(lines) == 0 { return nil } + return b.client.Batch( + context.Background(), b.id, lines) +} + +// copy returns a copy of the buffered lines. +func (b *Writer) copy() []*drone.Line { + return append(b.pending[:0:0], b.pending...) +} + +// clear clears the buffer. +func (b *Writer) clear() { + b.pending = b.pending[:0] +} + +func (b *Writer) stop() bool { + b.Lock() + var closed bool + if b.closed == false { + close(b.close) + closed = true + b.closed = true + } + b.Unlock() + return closed +} - return b.client.Batch(context.Background(), b.id, lines) +func (b *Writer) stopped() bool { + b.Lock() + closed := b.closed + b.Unlock() + return closed } -func (b *Writer) start() { +func (b *Writer) start() error { for { select { - case <-b.doneStream: - return + case <-b.close: + return nil case <-b.ready: select { - case <-b.doneStream: - return + case <-b.close: + return nil case <-time.After(b.interval): // we intentionally ignore errors. log streams - // are ephemeral and are considered low priority + // are ephemeral and are considered low prioirty // because they are not required for drone to // operator, and the impact of failure is minimal - _ = b.flush() + b.flush() } } } diff --git a/livelog/livelog_test.go b/livelog/livelog_test.go index 3f3db7d..e3939e6 100644 --- a/livelog/livelog_test.go +++ b/livelog/livelog_test.go @@ -9,242 +9,80 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" "github.com/google/go-cmp/cmp" ) -var optNoTS = cmpopts.IgnoreFields(drone.Line{}, "Timestamp") - -// TestLineWriterClose tests if closing the Writer triggers streaming of all pending lines and upload of the full history. -func TestLineWriterClose(t *testing.T) { - c := newMockClient() - w := New(c, 1) - w.SetInterval(time.Hour) // make sure it does not stream data - w.lineList.lineCnt = 4 // new lines are starting from the Number=4 - +func TestLineWriterSingle(t *testing.T) { + client := new(mockClient) + w := New(client, 1) + w.SetInterval(time.Duration(0)) + w.num = 4 w.Write([]byte("foo\nbar\n")) - a := w.lineList.peekPending() + a := w.pending b := []*drone.Line{ {Number: 4, Message: "foo\n"}, {Number: 5, Message: "bar\n"}, {Number: 6, Message: ""}, } - if diff := cmp.Diff(a, b, optNoTS); diff != "" { + if diff := cmp.Diff(a, b); diff != "" { t.Fail() t.Log(diff) } - if len(c.uploaded) != 0 || len(c.lines) != 0 { - t.Error("Expected nothing has been streamed or uploaded") - } - w.Close() - - if diff := cmp.Diff(c.lines, b, optNoTS); diff != "" { - t.Error("Expected all output has been streamed") - t.Log(diff) - } - - if diff := cmp.Diff(c.uploaded, b, optNoTS); diff != "" { - t.Error("Expected all output has been uploaded") - t.Log(diff) - } - - if len(w.lineList.peekPending()) > 0 { - t.Errorf("Expect empty buffer") - } -} - -// TestLineWriterStreaming tests if streaming is done correctly through a client. -func TestLineWriterStreaming(t *testing.T) { - c := newMockClient() - w := New(c, 1) - w.SetInterval(time.Nanosecond) - - w.Write([]byte("foo")) - c.waitUpload() - - var a, b []*drone.Line - - a = w.lineList.peekPending() - if len(a) != 0 { - t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a)) - } - - a = c.lines - b = []*drone.Line{{Number: 0, Message: "foo"}} - if diff := cmp.Diff(a, b, optNoTS); diff != "" { + a = client.uploaded + if diff := cmp.Diff(a, b); diff != "" { t.Fail() t.Log(diff) } - w.Write([]byte("bar")) - c.waitUpload() - - a = w.lineList.peekPending() - if len(a) != 0 { - t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a)) - } - - a = c.lines - b = []*drone.Line{{Number: 0, Message: "foo"}, {Number: 1, Message: "bar"}} - if diff := cmp.Diff(a, b, optNoTS); diff != "" { - t.Fail() - t.Log(diff) - } - - w.Close() - - a = c.uploaded - if diff := cmp.Diff(a, b, optNoTS); diff != "" { - t.Fail() - t.Log(diff) + if len(w.pending) > 0 { + t.Errorf("Expect empty buffer") } } -// TestLineWriterLimit tests if the history contains only last uploaded content after the limit has been breached. func TestLineWriterLimit(t *testing.T) { - c := newMockClient() - - w := New(c, 0) - if got, want := w.GetLimit(), defaultLimit; got != want { + client := new(mockClient) + w := New(client, 0) + if got, want := w.limit, defaultLimit; got != want { t.Errorf("Expect default buffer limit %d, got %d", want, got) } - w.SetLimit(6) - - if got, want := w.GetLimit(), 6; got != want { + if got, want := w.limit, 6; got != want { t.Errorf("Expect custom buffer limit %d, got %d", want, got) } w.Write([]byte("foo")) w.Write([]byte("bar")) - w.Write([]byte("baz")) // this write overflows the buffer, so "foo" is removed from the history + w.Write([]byte("baz")) - if got, want := w.GetSize(), 6; got != want { + if got, want := w.size, 6; got != want { t.Errorf("Expect buffer size %d, got %d", want, got) } - a := w.lineList.peekHistory() - b := []*drone.Line{{Number: 1, Message: "bar"}, {Number: 2, Message: "baz"}} - if diff := cmp.Diff(a, b, optNoTS); diff != "" { - t.Fail() - t.Log(diff) - } - - w.Write([]byte("boss")) // "boss" and "baz" are 7 bytes, so "bar" and "baz" are removed - - a = w.lineList.peekHistory() - b = []*drone.Line{{Number: 3, Message: "boss"}} - if diff := cmp.Diff(a, b, optNoTS); diff != "" { - t.Fail() - t.Log(diff) + a := w.history + b := []*drone.Line{ + {Number: 1, Message: "bar"}, + {Number: 2, Message: "baz"}, } - - w.Write([]byte("xy")) // this "xy" fits in the buffer so nothing should be removed now - - a = w.lineList.peekHistory() - b = []*drone.Line{{Number: 3, Message: "boss"}, {Number: 4, Message: "xy"}} - if diff := cmp.Diff(a, b, optNoTS); diff != "" { + if diff := cmp.Diff(a, b); diff != "" { t.Fail() t.Log(diff) } - - w.Close() -} - -// TestLineWriterLimitStopStreaming tests if streaming has been stopped after the buffer overflow. -func TestLineWriterLimitStopStreaming(t *testing.T) { - c := newMockClient() - w := New(c, 0) - w.SetLimit(8) - w.SetInterval(time.Nanosecond) - - w.Write([]byte("foo")) - if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 1 { - t.Errorf("Expected %d lines streamed, got %d", 1, len(c.lines)) - } - - w.Write([]byte("bar")) - if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 2 { - t.Errorf("Expected %d lines streamed, got %d", 2, len(c.lines)) - } - - w.Write([]byte("baz")) // overflow! streaming should be aborted - if uploaded := c.waitUpload(); uploaded || len(c.lines) != 2 { - t.Errorf("Expected streaming has been stopped. Streamed %d lines, expected %d", len(c.lines), 2) - } - - w.Close() - - if len(c.lines) != 2 { - t.Errorf("Closing should not trigged output streaming. Streamed %d lines, expected %d", len(c.lines), 2) - } -} - -// TestLineWriterOverLimit tests weird situation when data is written in chunks that exceed the limit. -func TestLineWriterOverLimit(t *testing.T) { - c := newMockClient() - - w := New(c, 0) - w.SetLimit(4) - - w.Write([]byte("foobar")) // over the limit, nothing should be written - - if got, want := w.GetSize(), 0; got != want { - t.Errorf("Expect buffer size %d, got %d", want, got) - } - - w.Close() - - if len(c.uploaded) != 0 { - t.Error("there should be no uploaded lines") - } -} - -func BenchmarkWriter_Write(b *testing.B) { - b.ReportAllocs() - c := &dummyClient{} - w := New(c, 0) - p := []byte("Lorem ipsum dolor sit amet,\nconsectetur adipiscing elit,\nsed do eiusmod tempor incididunt\nut labore et dolore magna aliqua.\n") - for i := 0; i < b.N; i++ { - w.Write(p) - } - w.Close() } type mockClient struct { client.Client - uploadDone chan struct{} - lines []*drone.Line - uploaded []*drone.Line -} - -func newMockClient() *mockClient { - return &mockClient{uploadDone: make(chan struct{}, 1)} -} - -// waitUpload waits a while for streaming to complete. Writer's interval should be set to very low value before this call. -func (m *mockClient) waitUpload() bool { - select { - case <-m.uploadDone: - return true - case <-time.After(10 * time.Millisecond): - return false - } + lines []*drone.Line + uploaded []*drone.Line } func (m *mockClient) Batch(ctx context.Context, id int64, lines []*drone.Line) error { m.lines = append(m.lines, lines...) - select { - case m.uploadDone <- struct{}{}: - default: - } return nil } @@ -252,8 +90,3 @@ func (m *mockClient) Upload(ctx context.Context, id int64, lines []*drone.Line) m.uploaded = lines return nil } - -type dummyClient struct{ client.Client } - -func (m *dummyClient) Batch(context.Context, int64, []*drone.Line) error { return nil } -func (m *dummyClient) Upload(context.Context, int64, []*drone.Line) error { return nil }