Merge pull request #26 from drone/revert-24-optimized-livelog

Revert "Optimized livelog streaming"
pull/25/merge
TP Honey 2 years ago committed by GitHub
commit d12a95813c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,189 +0,0 @@
// Copyright 2022 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Polyform License
// that can be found in the LICENSE file.
// Package livelog provides a Writer that collects pipeline
// output and streams to the central server.
package livelog
import (
"sync"
"time"
"github.com/drone/drone-go/drone"
)
type node struct {
drone.Line
next *node
}
type list struct {
sync.Mutex
lineCnt int
lineNow time.Time
size int
limit int
last *node
history *node
historyCount int
pending *node
pendingCount int
}
func makeList(limit int) *list {
return &list{
lineCnt: 0,
lineNow: time.Now(),
limit: limit,
}
}
func (l *list) SetLimit(limit int) {
l.Lock()
l.limit = limit
l.Unlock()
}
func (l *list) GetLimit() int {
l.Lock()
limit := l.limit
l.Unlock()
return limit
}
func (l *list) GetSize() int {
l.Lock()
size := l.size
l.Unlock()
return size
}
func (l *list) Push(p []byte) (overflow bool) {
l.Lock()
for _, part := range split(p) {
line := drone.Line{
Number: l.lineCnt,
Message: part,
Timestamp: int64(time.Since(l.lineNow).Seconds()),
}
l.lineNow = time.Now()
l.lineCnt++
overflow = overflow || l.push(line)
}
l.Unlock()
return overflow
}
func (l *list) push(line drone.Line) bool {
n := &node{
Line: line,
next: nil,
}
// put the element to list
l.size += len(line.Message)
if l.last != nil {
l.last.next = n
}
l.last = n
if l.history == nil {
l.history = n
}
l.historyCount++
if l.pending == nil {
l.pending = n
}
l.pendingCount++
// overflow check
var overflow bool
for l.size > l.limit && l.history != nil {
drop := l.history
next := drop.next
if l.pending == drop {
l.pending = next
l.pendingCount--
}
l.history = next
l.historyCount--
if l.history == nil {
l.last = nil
}
l.size -= len(drop.Line.Message)
overflow = true
}
return overflow
}
func (l *list) peekPending() (lines []*drone.Line) {
l.Lock()
lines = toSlice(l.pendingCount, l.pending)
l.Unlock()
return
}
// Pending returns lines added since the previous call to this method.
func (l *list) Pending() (lines []*drone.Line) {
l.Lock()
lines = toSlice(l.pendingCount, l.pending)
l.pending = nil
l.pendingCount = 0
l.Unlock()
return
}
func (l *list) peekHistory() (lines []*drone.Line) {
l.Lock()
lines = toSlice(l.historyCount, l.history)
l.Unlock()
return lines
}
// History returns full history stored in the buffer and clears the buffer.
func (l *list) History() (lines []*drone.Line) {
l.Lock()
lines = toSlice(l.historyCount, l.history)
l.history = nil
l.historyCount = 0
l.pending = nil
l.pendingCount = 0
l.last = nil
l.size = 0
l.Unlock()
return lines
}
func toSlice(count int, head *node) []*drone.Line {
if count == 0 {
return nil
}
lines := make([]*drone.Line, count)
for i, n := 0, head; n != nil; i, n = i+1, n.next {
lines[i] = &n.Line
}
return lines
}

@ -9,62 +9,55 @@ package livelog
import ( import (
"context" "context"
"strings" "strings"
"sync"
"time" "time"
"github.com/drone/drone-go/drone"
"github.com/drone/runner-go/client" "github.com/drone/runner-go/client"
) )
// defaultLimit is the default maximum log size in bytes. // defaultLimit is the default maximum log size in bytes.
const defaultLimit = 5242880 // 5MB const defaultLimit = 5242880 // 5MB
// Writer is an io.WriteCloser that sends logs to the server. // Writer is an io.Writer that sends logs to the server.
type Writer struct { type Writer struct {
sync.Mutex
client client.Client client client.Client
id int64 id int64
num int
now time.Time
size int
limit int
interval time.Duration interval time.Duration
lineList *list pending []*drone.Line
history []*drone.Line
stopStreamFn func() closed bool
doneStream <-chan struct{} close chan struct{}
ready chan struct{} ready chan struct{}
} }
// New returns a new Writer. // New returns a new Wrtier.
func New(client client.Client, id int64) *Writer { func New(client client.Client, id int64) *Writer {
streamCtx, stopStream := context.WithCancel(context.Background())
b := &Writer{ b := &Writer{
client: client, client: client,
id: id, id: id,
interval: time.Second, now: time.Now(),
lineList: makeList(defaultLimit), limit: defaultLimit,
stopStreamFn: stopStream, interval: time.Second,
doneStream: streamCtx.Done(), close: make(chan struct{}),
ready: make(chan struct{}, 1), ready: make(chan struct{}, 1),
} }
// a call to stopStream() stops this goroutine.
// this happens when the Close method is called or after overflow of output data (>limit).
go b.start() go b.start()
return b return b
} }
// SetLimit sets the Writer limit. // SetLimit sets the Writer limit.
func (b *Writer) SetLimit(limit int) { func (b *Writer) SetLimit(limit int) {
b.lineList.SetLimit(limit) b.limit = limit
}
// GetLimit returns the Writer limit.
func (b *Writer) GetLimit() int {
return b.lineList.GetLimit()
}
// GetSize returns amount of output data the Writer currently holds.
func (b *Writer) GetSize() int {
return b.lineList.GetSize()
} }
// SetInterval sets the Writer flusher interval. // SetInterval sets the Writer flusher interval.
@ -74,8 +67,31 @@ func (b *Writer) SetInterval(interval time.Duration) {
// Write uploads the live log stream to the server. // Write uploads the live log stream to the server.
func (b *Writer) Write(p []byte) (n int, err error) { func (b *Writer) Write(p []byte) (n int, err error) {
if isOverLimit := b.lineList.Push(p); isOverLimit { for _, part := range split(p) {
b.stopStreamFn() line := &drone.Line{
Number: b.num,
Message: part,
Timestamp: int64(time.Since(b.now).Seconds()),
}
for b.size+len(p) > b.limit {
b.stop() // buffer is full, step streaming data
b.size -= len(b.history[0].Message)
b.history = b.history[1:]
}
b.size = b.size + len(part)
b.num++
if b.stopped() == false {
b.Lock()
b.pending = append(b.pending, line)
b.Unlock()
}
b.Lock()
b.history = append(b.history, line)
b.Unlock()
} }
select { select {
@ -86,48 +102,78 @@ func (b *Writer) Write(p []byte) (n int, err error) {
return len(p), nil return len(p), nil
} }
// Close closes the writer and uploads the full contents to the server. // Close closes the writer and uploads the full contents to
// the server.
func (b *Writer) Close() error { func (b *Writer) Close() error {
select { if b.stop() {
case <-b.doneStream: b.flush()
default:
b.stopStreamFn()
_ = b.flush() // send all pending lines
} }
return b.upload()
return b.upload() // upload full log history
} }
// upload uploads the full log history to the server. // upload uploads the full log history to the server.
func (b *Writer) upload() error { func (b *Writer) upload() error {
return b.client.Upload(context.Background(), b.id, b.lineList.History()) return b.client.Upload(
context.Background(), b.id, b.history)
} }
// flush batch uploads all buffered logs to the server. // flush batch uploads all buffered logs to the server.
func (b *Writer) flush() error { func (b *Writer) flush() error {
lines := b.lineList.Pending() b.Lock()
lines := b.copy()
b.clear()
b.Unlock()
if len(lines) == 0 { if len(lines) == 0 {
return nil return nil
} }
return b.client.Batch(
context.Background(), b.id, lines)
}
// copy returns a copy of the buffered lines.
func (b *Writer) copy() []*drone.Line {
return append(b.pending[:0:0], b.pending...)
}
// clear clears the buffer.
func (b *Writer) clear() {
b.pending = b.pending[:0]
}
func (b *Writer) stop() bool {
b.Lock()
var closed bool
if b.closed == false {
close(b.close)
closed = true
b.closed = true
}
b.Unlock()
return closed
}
return b.client.Batch(context.Background(), b.id, lines) func (b *Writer) stopped() bool {
b.Lock()
closed := b.closed
b.Unlock()
return closed
} }
func (b *Writer) start() { func (b *Writer) start() error {
for { for {
select { select {
case <-b.doneStream: case <-b.close:
return return nil
case <-b.ready: case <-b.ready:
select { select {
case <-b.doneStream: case <-b.close:
return return nil
case <-time.After(b.interval): case <-time.After(b.interval):
// we intentionally ignore errors. log streams // we intentionally ignore errors. log streams
// are ephemeral and are considered low priority // are ephemeral and are considered low prioirty
// because they are not required for drone to // because they are not required for drone to
// operator, and the impact of failure is minimal // operator, and the impact of failure is minimal
_ = b.flush() b.flush()
} }
} }
} }

@ -9,242 +9,80 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/drone/drone-go/drone" "github.com/drone/drone-go/drone"
"github.com/drone/runner-go/client" "github.com/drone/runner-go/client"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
) )
var optNoTS = cmpopts.IgnoreFields(drone.Line{}, "Timestamp") func TestLineWriterSingle(t *testing.T) {
client := new(mockClient)
// TestLineWriterClose tests if closing the Writer triggers streaming of all pending lines and upload of the full history. w := New(client, 1)
func TestLineWriterClose(t *testing.T) { w.SetInterval(time.Duration(0))
c := newMockClient() w.num = 4
w := New(c, 1)
w.SetInterval(time.Hour) // make sure it does not stream data
w.lineList.lineCnt = 4 // new lines are starting from the Number=4
w.Write([]byte("foo\nbar\n")) w.Write([]byte("foo\nbar\n"))
a := w.lineList.peekPending() a := w.pending
b := []*drone.Line{ b := []*drone.Line{
{Number: 4, Message: "foo\n"}, {Number: 4, Message: "foo\n"},
{Number: 5, Message: "bar\n"}, {Number: 5, Message: "bar\n"},
{Number: 6, Message: ""}, {Number: 6, Message: ""},
} }
if diff := cmp.Diff(a, b, optNoTS); diff != "" { if diff := cmp.Diff(a, b); diff != "" {
t.Fail() t.Fail()
t.Log(diff) t.Log(diff)
} }
if len(c.uploaded) != 0 || len(c.lines) != 0 {
t.Error("Expected nothing has been streamed or uploaded")
}
w.Close() w.Close()
a = client.uploaded
if diff := cmp.Diff(c.lines, b, optNoTS); diff != "" { if diff := cmp.Diff(a, b); diff != "" {
t.Error("Expected all output has been streamed")
t.Log(diff)
}
if diff := cmp.Diff(c.uploaded, b, optNoTS); diff != "" {
t.Error("Expected all output has been uploaded")
t.Log(diff)
}
if len(w.lineList.peekPending()) > 0 {
t.Errorf("Expect empty buffer")
}
}
// TestLineWriterStreaming tests if streaming is done correctly through a client.
func TestLineWriterStreaming(t *testing.T) {
c := newMockClient()
w := New(c, 1)
w.SetInterval(time.Nanosecond)
w.Write([]byte("foo"))
c.waitUpload()
var a, b []*drone.Line
a = w.lineList.peekPending()
if len(a) != 0 {
t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a))
}
a = c.lines
b = []*drone.Line{{Number: 0, Message: "foo"}}
if diff := cmp.Diff(a, b, optNoTS); diff != "" {
t.Fail() t.Fail()
t.Log(diff) t.Log(diff)
} }
w.Write([]byte("bar")) if len(w.pending) > 0 {
c.waitUpload() t.Errorf("Expect empty buffer")
a = w.lineList.peekPending()
if len(a) != 0 {
t.Errorf("Expected that all lines are uploaded, but there are still %d pending lines", len(a))
}
a = c.lines
b = []*drone.Line{{Number: 0, Message: "foo"}, {Number: 1, Message: "bar"}}
if diff := cmp.Diff(a, b, optNoTS); diff != "" {
t.Fail()
t.Log(diff)
}
w.Close()
a = c.uploaded
if diff := cmp.Diff(a, b, optNoTS); diff != "" {
t.Fail()
t.Log(diff)
} }
} }
// TestLineWriterLimit tests if the history contains only last uploaded content after the limit has been breached.
func TestLineWriterLimit(t *testing.T) { func TestLineWriterLimit(t *testing.T) {
c := newMockClient() client := new(mockClient)
w := New(client, 0)
w := New(c, 0) if got, want := w.limit, defaultLimit; got != want {
if got, want := w.GetLimit(), defaultLimit; got != want {
t.Errorf("Expect default buffer limit %d, got %d", want, got) t.Errorf("Expect default buffer limit %d, got %d", want, got)
} }
w.SetLimit(6) w.SetLimit(6)
if got, want := w.limit, 6; got != want {
if got, want := w.GetLimit(), 6; got != want {
t.Errorf("Expect custom buffer limit %d, got %d", want, got) t.Errorf("Expect custom buffer limit %d, got %d", want, got)
} }
w.Write([]byte("foo")) w.Write([]byte("foo"))
w.Write([]byte("bar")) w.Write([]byte("bar"))
w.Write([]byte("baz")) // this write overflows the buffer, so "foo" is removed from the history w.Write([]byte("baz"))
if got, want := w.GetSize(), 6; got != want { if got, want := w.size, 6; got != want {
t.Errorf("Expect buffer size %d, got %d", want, got) t.Errorf("Expect buffer size %d, got %d", want, got)
} }
a := w.lineList.peekHistory() a := w.history
b := []*drone.Line{{Number: 1, Message: "bar"}, {Number: 2, Message: "baz"}} b := []*drone.Line{
if diff := cmp.Diff(a, b, optNoTS); diff != "" { {Number: 1, Message: "bar"},
t.Fail() {Number: 2, Message: "baz"},
t.Log(diff)
}
w.Write([]byte("boss")) // "boss" and "baz" are 7 bytes, so "bar" and "baz" are removed
a = w.lineList.peekHistory()
b = []*drone.Line{{Number: 3, Message: "boss"}}
if diff := cmp.Diff(a, b, optNoTS); diff != "" {
t.Fail()
t.Log(diff)
} }
if diff := cmp.Diff(a, b); diff != "" {
w.Write([]byte("xy")) // this "xy" fits in the buffer so nothing should be removed now
a = w.lineList.peekHistory()
b = []*drone.Line{{Number: 3, Message: "boss"}, {Number: 4, Message: "xy"}}
if diff := cmp.Diff(a, b, optNoTS); diff != "" {
t.Fail() t.Fail()
t.Log(diff) t.Log(diff)
} }
w.Close()
}
// TestLineWriterLimitStopStreaming tests if streaming has been stopped after the buffer overflow.
func TestLineWriterLimitStopStreaming(t *testing.T) {
c := newMockClient()
w := New(c, 0)
w.SetLimit(8)
w.SetInterval(time.Nanosecond)
w.Write([]byte("foo"))
if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 1 {
t.Errorf("Expected %d lines streamed, got %d", 1, len(c.lines))
}
w.Write([]byte("bar"))
if uploaded := c.waitUpload(); !uploaded || len(c.lines) != 2 {
t.Errorf("Expected %d lines streamed, got %d", 2, len(c.lines))
}
w.Write([]byte("baz")) // overflow! streaming should be aborted
if uploaded := c.waitUpload(); uploaded || len(c.lines) != 2 {
t.Errorf("Expected streaming has been stopped. Streamed %d lines, expected %d", len(c.lines), 2)
}
w.Close()
if len(c.lines) != 2 {
t.Errorf("Closing should not trigged output streaming. Streamed %d lines, expected %d", len(c.lines), 2)
}
}
// TestLineWriterOverLimit tests weird situation when data is written in chunks that exceed the limit.
func TestLineWriterOverLimit(t *testing.T) {
c := newMockClient()
w := New(c, 0)
w.SetLimit(4)
w.Write([]byte("foobar")) // over the limit, nothing should be written
if got, want := w.GetSize(), 0; got != want {
t.Errorf("Expect buffer size %d, got %d", want, got)
}
w.Close()
if len(c.uploaded) != 0 {
t.Error("there should be no uploaded lines")
}
}
func BenchmarkWriter_Write(b *testing.B) {
b.ReportAllocs()
c := &dummyClient{}
w := New(c, 0)
p := []byte("Lorem ipsum dolor sit amet,\nconsectetur adipiscing elit,\nsed do eiusmod tempor incididunt\nut labore et dolore magna aliqua.\n")
for i := 0; i < b.N; i++ {
w.Write(p)
}
w.Close()
} }
type mockClient struct { type mockClient struct {
client.Client client.Client
uploadDone chan struct{} lines []*drone.Line
lines []*drone.Line uploaded []*drone.Line
uploaded []*drone.Line
}
func newMockClient() *mockClient {
return &mockClient{uploadDone: make(chan struct{}, 1)}
}
// waitUpload waits a while for streaming to complete. Writer's interval should be set to very low value before this call.
func (m *mockClient) waitUpload() bool {
select {
case <-m.uploadDone:
return true
case <-time.After(10 * time.Millisecond):
return false
}
} }
func (m *mockClient) Batch(ctx context.Context, id int64, lines []*drone.Line) error { func (m *mockClient) Batch(ctx context.Context, id int64, lines []*drone.Line) error {
m.lines = append(m.lines, lines...) m.lines = append(m.lines, lines...)
select {
case m.uploadDone <- struct{}{}:
default:
}
return nil return nil
} }
@ -252,8 +90,3 @@ func (m *mockClient) Upload(ctx context.Context, id int64, lines []*drone.Line)
m.uploaded = lines m.uploaded = lines
return nil return nil
} }
type dummyClient struct{ client.Client }
func (m *dummyClient) Batch(context.Context, int64, []*drone.Line) error { return nil }
func (m *dummyClient) Upload(context.Context, int64, []*drone.Line) error { return nil }

Loading…
Cancel
Save