From 951bc461acec4ba65a2b96c3cdb1fd384fe68c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Thu, 3 Mar 2022 16:43:54 +0100 Subject: [PATCH] Optimized livelog streaming --- livelog/list.go | 189 +++++++++++++++++++++++++++++++++++ livelog/livelog.go | 148 ++++++++++----------------- livelog/livelog_test.go | 215 +++++++++++++++++++++++++++++++++++----- 3 files changed, 431 insertions(+), 121 deletions(-) create mode 100644 livelog/list.go diff --git a/livelog/list.go b/livelog/list.go new file mode 100644 index 0000000..90b5ee1 --- /dev/null +++ b/livelog/list.go @@ -0,0 +1,189 @@ +// Copyright 2022 Drone.IO Inc. All rights reserved. +// Use of this source code is governed by the Polyform License +// that can be found in the LICENSE file. + +// Package livelog provides a Writer that collects pipeline +// output and streams to the central server. + +package livelog + +import ( + "sync" + "time" + + "github.com/drone/drone-go/drone" +) + +type node struct { + drone.Line + next *node +} + +type list struct { + sync.Mutex + + lineCnt int + lineNow time.Time + + size int + limit int + + last *node + + history *node + historyCount int + + pending *node + pendingCount int +} + +func makeList(limit int) *list { + return &list{ + lineCnt: 0, + lineNow: time.Now(), + limit: limit, + } +} + +func (l *list) SetLimit(limit int) { + l.Lock() + l.limit = limit + l.Unlock() +} + +func (l *list) GetLimit() int { + l.Lock() + limit := l.limit + l.Unlock() + return limit +} + +func (l *list) GetSize() int { + l.Lock() + size := l.size + l.Unlock() + return size +} + +func (l *list) Push(p []byte) (overflow bool) { + l.Lock() + for _, part := range split(p) { + line := drone.Line{ + Number: l.lineCnt, + Message: part, + Timestamp: int64(time.Since(l.lineNow).Seconds()), + } + + l.lineNow = time.Now() + l.lineCnt++ + + overflow = overflow || l.push(line) + } + l.Unlock() + + return overflow +} + +func (l *list) push(line drone.Line) bool { + n := &node{ + Line: line, + next: nil, + } + + // put the element to list + + l.size += len(line.Message) + + if l.last != nil { + l.last.next = n + } + l.last = n + + if l.history == nil { + l.history = n + } + l.historyCount++ + + if l.pending == nil { + l.pending = n + } + l.pendingCount++ + + // overflow check + + var overflow bool + + for l.size > l.limit && l.history != nil { + drop := l.history + next := drop.next + + if l.pending == drop { + l.pending = next + l.pendingCount-- + } + + l.history = next + l.historyCount-- + + if l.history == nil { + l.last = nil + } + + l.size -= len(drop.Line.Message) + + overflow = true + } + + return overflow +} + +func (l *list) peekPending() (lines []*drone.Line) { + l.Lock() + lines = toSlice(l.pendingCount, l.pending) + l.Unlock() + return +} + +// Pending returns lines added since the previous call to this method. +func (l *list) Pending() (lines []*drone.Line) { + l.Lock() + lines = toSlice(l.pendingCount, l.pending) + l.pending = nil + l.pendingCount = 0 + l.Unlock() + return +} + +func (l *list) peekHistory() (lines []*drone.Line) { + l.Lock() + lines = toSlice(l.historyCount, l.history) + l.Unlock() + return lines +} + +// History returns full history stored in the buffer and clears the buffer. +func (l *list) History() (lines []*drone.Line) { + l.Lock() + lines = toSlice(l.historyCount, l.history) + l.history = nil + l.historyCount = 0 + l.pending = nil + l.pendingCount = 0 + l.last = nil + l.size = 0 + l.Unlock() + return lines +} + +func toSlice(count int, head *node) []*drone.Line { + if count == 0 { + return nil + } + + lines := make([]*drone.Line, count) + for i, n := 0, head; n != nil; i, n = i+1, n.next { + lines[i] = &n.Line + } + + return lines +} diff --git a/livelog/livelog.go b/livelog/livelog.go index 8c42fd3..895c84d 100644 --- a/livelog/livelog.go +++ b/livelog/livelog.go @@ -9,55 +9,62 @@ package livelog import ( "context" "strings" - "sync" "time" - "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" ) // defaultLimit is the default maximum log size in bytes. const defaultLimit = 5242880 // 5MB -// Writer is an io.Writer that sends logs to the server. +// Writer is an io.WriteCloser that sends logs to the server. type Writer struct { - sync.Mutex - client client.Client - id int64 - num int - now time.Time - size int - limit int + id int64 interval time.Duration - pending []*drone.Line - history []*drone.Line + lineList *list - closed bool - close chan struct{} - ready chan struct{} + stopStreamFn func() + doneStream <-chan struct{} + ready chan struct{} } -// New returns a new Wrtier. +// New returns a new Writer. func New(client client.Client, id int64) *Writer { + streamCtx, stopStream := context.WithCancel(context.Background()) + b := &Writer{ - client: client, - id: id, - now: time.Now(), - limit: defaultLimit, - interval: time.Second, - close: make(chan struct{}), - ready: make(chan struct{}, 1), + client: client, + id: id, + interval: time.Second, + lineList: makeList(defaultLimit), + stopStreamFn: stopStream, + doneStream: streamCtx.Done(), + ready: make(chan struct{}, 1), } + + // a call to stopStream() stops this goroutine. + // this happens when the Close method is called or after overflow of output data (>limit). go b.start() + return b } // SetLimit sets the Writer limit. func (b *Writer) SetLimit(limit int) { - b.limit = limit + b.lineList.SetLimit(limit) +} + +// GetLimit returns the Writer limit. +func (b *Writer) GetLimit() int { + return b.lineList.GetLimit() +} + +// GetSize returns amount of output data the Writer currently holds. +func (b *Writer) GetSize() int { + return b.lineList.GetSize() } // SetInterval sets the Writer flusher interval. @@ -67,31 +74,8 @@ func (b *Writer) SetInterval(interval time.Duration) { // Write uploads the live log stream to the server. func (b *Writer) Write(p []byte) (n int, err error) { - for _, part := range split(p) { - line := &drone.Line{ - Number: b.num, - Message: part, - Timestamp: int64(time.Since(b.now).Seconds()), - } - - for b.size+len(p) > b.limit { - b.stop() // buffer is full, step streaming data - b.size -= len(b.history[0].Message) - b.history = b.history[1:] - } - - b.size = b.size + len(part) - b.num++ - - if b.stopped() == false { - b.Lock() - b.pending = append(b.pending, line) - b.Unlock() - } - - b.Lock() - b.history = append(b.history, line) - b.Unlock() + if isOverLimit := b.lineList.Push(p); isOverLimit { + b.stopStreamFn() } select { @@ -102,78 +86,48 @@ func (b *Writer) Write(p []byte) (n int, err error) { return len(p), nil } -// Close closes the writer and uploads the full contents to -// the server. +// Close closes the writer and uploads the full contents to the server. func (b *Writer) Close() error { - if b.stop() { - b.flush() + select { + case <-b.doneStream: + default: + b.stopStreamFn() + _ = b.flush() // send all pending lines } - return b.upload() + + return b.upload() // upload full log history } // upload uploads the full log history to the server. func (b *Writer) upload() error { - return b.client.Upload( - context.Background(), b.id, b.history) + return b.client.Upload(context.Background(), b.id, b.lineList.History()) } // flush batch uploads all buffered logs to the server. func (b *Writer) flush() error { - b.Lock() - lines := b.copy() - b.clear() - b.Unlock() + lines := b.lineList.Pending() if len(lines) == 0 { return nil } - return b.client.Batch( - context.Background(), b.id, lines) -} - -// copy returns a copy of the buffered lines. -func (b *Writer) copy() []*drone.Line { - return append(b.pending[:0:0], b.pending...) -} - -// clear clears the buffer. -func (b *Writer) clear() { - b.pending = b.pending[:0] -} - -func (b *Writer) stop() bool { - b.Lock() - var closed bool - if b.closed == false { - close(b.close) - closed = true - b.closed = true - } - b.Unlock() - return closed -} -func (b *Writer) stopped() bool { - b.Lock() - closed := b.closed - b.Unlock() - return closed + return b.client.Batch(context.Background(), b.id, lines) } -func (b *Writer) start() error { +func (b *Writer) start() { for { select { - case <-b.close: - return nil + case <-b.doneStream: + return case <-b.ready: select { - case <-b.close: - return nil + case <-b.doneStream: + return case <-time.After(b.interval): // we intentionally ignore errors. log streams - // are ephemeral and are considered low prioirty + // are ephemeral and are considered low priority // because they are not required for drone to // operator, and the impact of failure is minimal - b.flush() + _ = b.flush() } } } diff --git a/livelog/livelog_test.go b/livelog/livelog_test.go index e3939e6..3f3db7d 100644 --- a/livelog/livelog_test.go +++ b/livelog/livelog_test.go @@ -9,80 +9,242 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" "github.com/google/go-cmp/cmp" ) -func TestLineWriterSingle(t *testing.T) { - client := new(mockClient) - w := New(client, 1) - w.SetInterval(time.Duration(0)) - w.num = 4 +var optNoTS = cmpopts.IgnoreFields(drone.Line{}, "Timestamp") + +// TestLineWriterClose tests if closing the Writer triggers streaming of all pending lines and upload of the full history. +func TestLineWriterClose(t *testing.T) { + c := newMockClient() + w := New(c, 1) + w.SetInterval(time.Hour) // make sure it does not stream data + w.lineList.lineCnt = 4 // new lines are starting from the Number=4 + w.Write([]byte("foo\nbar\n")) - a := w.pending + a := w.lineList.peekPending() b := []*drone.Line{ {Number: 4, Message: "foo\n"}, {Number: 5, Message: "bar\n"}, {Number: 6, Message: ""}, } - if diff := cmp.Diff(a, b); diff != "" { + if diff := cmp.Diff(a, b, optNoTS); diff != "" { t.Fail() t.Log(diff) } + if len(c.uploaded) != 0 || len(c.lines) != 0 { + t.Error("Expected nothing has been streamed or uploaded") + } + w.Close() - a = client.uploaded - if diff := cmp.Diff(a, b); diff != "" { - t.Fail() + + if diff := cmp.Diff(c.lines, b, optNoTS); diff != "" { + t.Error("Expected all output has been streamed") t.Log(diff) } - if len(w.pending) > 0 { + if diff := cmp.Diff(c.uploaded, b, optNoTS); diff != "" { + t.Error("Expected all output has been uploaded") + t.Log(diff) + } + + if len(w.lineList.peekPending()) > 0 { t.Errorf("Expect empty buffer") } } +// TestLineWriterStreaming tests if streaming is done correctly through a client. +func TestLineWriterStreaming(t *testing.T) { + c := newMockClient() + w := New(c, 1) + w.SetInterval(time.Nanosecond) + + w.Write([]byte("foo")) + c.waitUpload() + + var a, b []*drone.Line + + a = w.lineList.peekPending() + if len(a) != 0 { + t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a)) + } + + a = c.lines + b = []*drone.Line{{Number: 0, Message: "foo"}} + if diff := cmp.Diff(a, b, optNoTS); diff != "" { + t.Fail() + t.Log(diff) + } + + w.Write([]byte("bar")) + c.waitUpload() + + a = w.lineList.peekPending() + if len(a) != 0 { + t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a)) + } + + a = c.lines + b = []*drone.Line{{Number: 0, Message: "foo"}, {Number: 1, Message: "bar"}} + if diff := cmp.Diff(a, b, optNoTS); diff != "" { + t.Fail() + t.Log(diff) + } + + w.Close() + + a = c.uploaded + if diff := cmp.Diff(a, b, optNoTS); diff != "" { + t.Fail() + t.Log(diff) + } +} + +// TestLineWriterLimit tests if the history contains only last uploaded content after the limit has been breached. func TestLineWriterLimit(t *testing.T) { - client := new(mockClient) - w := New(client, 0) - if got, want := w.limit, defaultLimit; got != want { + c := newMockClient() + + w := New(c, 0) + if got, want := w.GetLimit(), defaultLimit; got != want { t.Errorf("Expect default buffer limit %d, got %d", want, got) } + w.SetLimit(6) - if got, want := w.limit, 6; got != want { + + if got, want := w.GetLimit(), 6; got != want { t.Errorf("Expect custom buffer limit %d, got %d", want, got) } w.Write([]byte("foo")) w.Write([]byte("bar")) - w.Write([]byte("baz")) + w.Write([]byte("baz")) // this write overflows the buffer, so "foo" is removed from the history - if got, want := w.size, 6; got != want { + if got, want := w.GetSize(), 6; got != want { t.Errorf("Expect buffer size %d, got %d", want, got) } - a := w.history - b := []*drone.Line{ - {Number: 1, Message: "bar"}, - {Number: 2, Message: "baz"}, + a := w.lineList.peekHistory() + b := []*drone.Line{{Number: 1, Message: "bar"}, {Number: 2, Message: "baz"}} + if diff := cmp.Diff(a, b, optNoTS); diff != "" { + t.Fail() + t.Log(diff) } - if diff := cmp.Diff(a, b); diff != "" { + + w.Write([]byte("boss")) // "boss" and "baz" are 7 bytes, so "bar" and "baz" are removed + + a = w.lineList.peekHistory() + b = []*drone.Line{{Number: 3, Message: "boss"}} + if diff := cmp.Diff(a, b, optNoTS); diff != "" { t.Fail() t.Log(diff) } + + w.Write([]byte("xy")) // this "xy" fits in the buffer so nothing should be removed now + + a = w.lineList.peekHistory() + b = []*drone.Line{{Number: 3, Message: "boss"}, {Number: 4, Message: "xy"}} + if diff := cmp.Diff(a, b, optNoTS); diff != "" { + t.Fail() + t.Log(diff) + } + + w.Close() +} + +// TestLineWriterLimitStopStreaming tests if streaming has been stopped after the buffer overflow. +func TestLineWriterLimitStopStreaming(t *testing.T) { + c := newMockClient() + w := New(c, 0) + w.SetLimit(8) + w.SetInterval(time.Nanosecond) + + w.Write([]byte("foo")) + if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 1 { + t.Errorf("Expected %d lines streamed, got %d", 1, len(c.lines)) + } + + w.Write([]byte("bar")) + if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 2 { + t.Errorf("Expected %d lines streamed, got %d", 2, len(c.lines)) + } + + w.Write([]byte("baz")) // overflow! streaming should be aborted + if uploaded := c.waitUpload(); uploaded || len(c.lines) != 2 { + t.Errorf("Expected streaming has been stopped. Streamed %d lines, expected %d", len(c.lines), 2) + } + + w.Close() + + if len(c.lines) != 2 { + t.Errorf("Closing should not trigged output streaming. Streamed %d lines, expected %d", len(c.lines), 2) + } +} + +// TestLineWriterOverLimit tests weird situation when data is written in chunks that exceed the limit. +func TestLineWriterOverLimit(t *testing.T) { + c := newMockClient() + + w := New(c, 0) + w.SetLimit(4) + + w.Write([]byte("foobar")) // over the limit, nothing should be written + + if got, want := w.GetSize(), 0; got != want { + t.Errorf("Expect buffer size %d, got %d", want, got) + } + + w.Close() + + if len(c.uploaded) != 0 { + t.Error("there should be no uploaded lines") + } +} + +func BenchmarkWriter_Write(b *testing.B) { + b.ReportAllocs() + c := &dummyClient{} + w := New(c, 0) + p := []byte("Lorem ipsum dolor sit amet,\nconsectetur adipiscing elit,\nsed do eiusmod tempor incididunt\nut labore et dolore magna aliqua.\n") + for i := 0; i < b.N; i++ { + w.Write(p) + } + w.Close() } type mockClient struct { client.Client - lines []*drone.Line - uploaded []*drone.Line + uploadDone chan struct{} + lines []*drone.Line + uploaded []*drone.Line +} + +func newMockClient() *mockClient { + return &mockClient{uploadDone: make(chan struct{}, 1)} +} + +// waitUpload waits a while for streaming to complete. Writer's interval should be set to very low value before this call. +func (m *mockClient) waitUpload() bool { + select { + case <-m.uploadDone: + return true + case <-time.After(10 * time.Millisecond): + return false + } } func (m *mockClient) Batch(ctx context.Context, id int64, lines []*drone.Line) error { m.lines = append(m.lines, lines...) + select { + case m.uploadDone <- struct{}{}: + default: + } return nil } @@ -90,3 +252,8 @@ func (m *mockClient) Upload(ctx context.Context, id int64, lines []*drone.Line) m.uploaded = lines return nil } + +type dummyClient struct{ client.Client } + +func (m *dummyClient) Batch(context.Context, int64, []*drone.Line) error { return nil } +func (m *dummyClient) Upload(context.Context, int64, []*drone.Line) error { return nil }