From 1658440152c83c8d3c73f357fb41d5393633bd9b Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Fri, 22 Oct 2021 13:49:46 +0100 Subject: [PATCH 01/10] add uploader to support streaming a card to the server --- client/client.go | 3 +++ client/http.go | 8 +++++++ pipeline/runtime/execer.go | 12 ++++++++++ pipeline/runtime/type.go | 3 +++ pipeline/uploader.go | 18 ++++++++++++++ pipeline/uploader/upload.go | 40 ++++++++++++++++++++++++++++++++ pipeline/uploader/upload_test.go | 1 + pipeline/uploader_test.go | 1 + 8 files changed, 86 insertions(+) create mode 100644 pipeline/uploader.go create mode 100644 pipeline/uploader/upload.go create mode 100644 pipeline/uploader/upload_test.go create mode 100644 pipeline/uploader_test.go diff --git a/client/client.go b/client/client.go index 39c8e63..e673d20 100644 --- a/client/client.go +++ b/client/client.go @@ -89,4 +89,7 @@ type Client interface { // Upload uploads the full logs to the server. Upload(ctx context.Context, step int64, lines []*drone.Line) error + + // UploadCard uploads a card to drone server. + UploadCard(ctx context.Context, step int64, card *drone.CardInput) error } diff --git a/client/http.go b/client/http.go index ea4fcb8..06f196a 100644 --- a/client/http.go +++ b/client/http.go @@ -30,6 +30,7 @@ const ( endpointWatch = "/rpc/v2/build/%d/watch" endpointBatch = "/rpc/v2/step/%d/logs/batch" endpointUpload = "/rpc/v2/step/%d/logs/upload" + endpointCard = "/rpc/v2/step/%d/card" ) var _ Client = (*HTTPClient)(nil) @@ -201,6 +202,13 @@ func (p *HTTPClient) Upload(ctx context.Context, step int64, lines []*drone.Line return err } +// UploadCard uploads a card to drone server. +func (p *HTTPClient) UploadCard(ctx context.Context, step int64, card *drone.CardInput) error { + uri := fmt.Sprintf(endpointCard, step) + _, err := p.retry(ctx, uri, "POST", &card, nil) + return err +} + func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) { for { res, err := p.do(ctx, path, method, in, out) diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index 37a1172..5fdc861 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -24,6 +24,7 @@ type Execer struct { engine Engine reporter pipeline.Reporter streamer pipeline.Streamer + uploader pipeline.Uploader sem *semaphore.Weighted } @@ -31,6 +32,7 @@ type Execer struct { func NewExecer( reporter pipeline.Reporter, streamer pipeline.Streamer, + uploader pipeline.Uploader, engine Engine, threads int64, ) *Execer { @@ -38,6 +40,7 @@ func NewExecer( reporter: reporter, streamer: streamer, engine: engine, + uploader: uploader, } if threads > 0 { // optional semaphore that limits the number of steps @@ -250,6 +253,15 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste result = multierror.Append(result, err) } + // stream card data to server if exists + file, _ := e.engine.StreamFile(ctx, copy, "/tmp/card.json") + if file != nil { + s := state.Find(step.GetName()) + err = e.uploader.UploadCard(ctx, file, s.ID) + if err != nil { + return nil + } + } // if the context was cancelled and returns a Canceled or // DeadlineExceeded error this indicates the pipeline was // cancelled. diff --git a/pipeline/runtime/type.go b/pipeline/runtime/type.go index 38fa4de..3aa66d0 100644 --- a/pipeline/runtime/type.go +++ b/pipeline/runtime/type.go @@ -84,6 +84,9 @@ type ( // Run runs the pipeline step. Run(context.Context, Spec, Step, io.Writer) (*State, error) + + // StreamFile copies a file to the server + StreamFile(context.Context, Step, string) (io.ReadCloser, error) } // Spec is an interface that must be implemented by all diff --git a/pipeline/uploader.go b/pipeline/uploader.go new file mode 100644 index 0000000..543928a --- /dev/null +++ b/pipeline/uploader.go @@ -0,0 +1,18 @@ +package pipeline + +import ( + "context" + "io" +) + +type Uploader interface { + UploadCard(ctx context.Context, r io.ReadCloser, step int64) error +} + +func NopUploader() Uploader { + return new(nopUploader) +} + +type nopUploader struct{} + +func (*nopUploader) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { return nil } diff --git a/pipeline/uploader/upload.go b/pipeline/uploader/upload.go new file mode 100644 index 0000000..2feb3b8 --- /dev/null +++ b/pipeline/uploader/upload.go @@ -0,0 +1,40 @@ +package uploader + +import ( + "context" + "encoding/json" + "io" + + "github.com/drone/drone-go/drone" + "github.com/drone/runner-go/client" + "github.com/drone/runner-go/pipeline" +) + +var _ pipeline.Uploader = (*Upload)(nil) + +type Upload struct { + client client.Client +} + +func New(client client.Client) *Upload { + return &Upload{ + client: client, + } +} + +func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { + bytes, err := io.ReadAll(r) + if err != nil { + return err + } + card := drone.CardInput{} + err = json.Unmarshal(bytes, &card) + if err != nil { + return err + } + err = s.client.UploadCard(ctx, step, &card) + if err != nil { + return err + } + return nil +} diff --git a/pipeline/uploader/upload_test.go b/pipeline/uploader/upload_test.go new file mode 100644 index 0000000..01aafa6 --- /dev/null +++ b/pipeline/uploader/upload_test.go @@ -0,0 +1 @@ +package uploader diff --git a/pipeline/uploader_test.go b/pipeline/uploader_test.go new file mode 100644 index 0000000..fb2071c --- /dev/null +++ b/pipeline/uploader_test.go @@ -0,0 +1 @@ +package pipeline From 386295f94df54575429744780a4b361111978f90 Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Tue, 26 Oct 2021 15:17:41 +0100 Subject: [PATCH 02/10] move schema update on step to runnner go logic --- internal/merge.go | 1 + pipeline/runtime/execer.go | 5 ++--- pipeline/runtime/type.go | 2 +- pipeline/uploader.go | 4 ++-- pipeline/uploader/upload.go | 17 +++++++++++++++-- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/merge.go b/internal/merge.go index 98a6722..08a12f9 100644 --- a/internal/merge.go +++ b/internal/merge.go @@ -25,4 +25,5 @@ func MergeStep(src, dst *drone.Step) { dst.Started = src.Started dst.Stopped = src.Stopped dst.Version = src.Version + dst.Schema = src.Schema } diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index 5fdc861..4b3d28d 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -254,10 +254,9 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste } // stream card data to server if exists - file, _ := e.engine.StreamFile(ctx, copy, "/tmp/card.json") + file, _ := e.engine.StreamFile(ctx, spec, copy, "/tmp/card.json") if file != nil { - s := state.Find(step.GetName()) - err = e.uploader.UploadCard(ctx, file, s.ID) + err = e.uploader.UploadCard(ctx, file, state, step.GetName()) if err != nil { return nil } diff --git a/pipeline/runtime/type.go b/pipeline/runtime/type.go index 3aa66d0..8ded0fa 100644 --- a/pipeline/runtime/type.go +++ b/pipeline/runtime/type.go @@ -86,7 +86,7 @@ type ( Run(context.Context, Spec, Step, io.Writer) (*State, error) // StreamFile copies a file to the server - StreamFile(context.Context, Step, string) (io.ReadCloser, error) + StreamFile(context.Context, Spec, Step, string) (io.ReadCloser, error) } // Spec is an interface that must be implemented by all diff --git a/pipeline/uploader.go b/pipeline/uploader.go index 543928a..d2a5df4 100644 --- a/pipeline/uploader.go +++ b/pipeline/uploader.go @@ -6,7 +6,7 @@ import ( ) type Uploader interface { - UploadCard(ctx context.Context, r io.ReadCloser, step int64) error + UploadCard(context.Context, io.ReadCloser, *State, string) error } func NopUploader() Uploader { @@ -15,4 +15,4 @@ func NopUploader() Uploader { type nopUploader struct{} -func (*nopUploader) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { return nil } +func (*nopUploader) UploadCard(context.Context, io.ReadCloser, *State, string) error { return nil } diff --git a/pipeline/uploader/upload.go b/pipeline/uploader/upload.go index 2feb3b8..3d1628e 100644 --- a/pipeline/uploader/upload.go +++ b/pipeline/uploader/upload.go @@ -7,6 +7,7 @@ import ( "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" + "github.com/drone/runner-go/internal" "github.com/drone/runner-go/pipeline" ) @@ -22,7 +23,8 @@ func New(client client.Client) *Upload { } } -func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { +func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, state *pipeline.State, stepName string) error { + src := state.Find(stepName) bytes, err := io.ReadAll(r) if err != nil { return err @@ -32,9 +34,20 @@ func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, step int64) er if err != nil { return err } - err = s.client.UploadCard(ctx, step, &card) + err = s.client.UploadCard(ctx, src.ID, &card) if err != nil { return err } + // update step schema + state.Lock() + src.Schema = card.Schema + cpy := internal.CloneStep(src) + state.Unlock() + err = s.client.UpdateStep(ctx, cpy) + if err == nil { + state.Lock() + internal.MergeStep(cpy, src) + state.Unlock() + } return nil } From 0a95ecec9830c0cd9d0092927b0defe359ad3a35 Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Wed, 3 Nov 2021 11:37:28 +0000 Subject: [PATCH 03/10] add support for kubernetes runner --- livelog/copy.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/livelog/copy.go b/livelog/copy.go index 6e787bc..70b92e5 100644 --- a/livelog/copy.go +++ b/livelog/copy.go @@ -6,15 +6,29 @@ package livelog import ( "bufio" + "encoding/base64" "io" + "io/ioutil" + "regexp" ) +var re = regexp.MustCompile("#((.*?)#)") + // Copy copies from src to dst and removes until either EOF // is reached on src or an error occurs. func Copy(dst io.Writer, src io.ReadCloser) error { r := bufio.NewReader(src) for { bytes, err := r.ReadBytes('\n') + // check logs for card data + card := re.FindStringSubmatch(string(bytes)) + if card != nil { + data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) + if err == nil { + _ = ioutil.WriteFile("/tmp/card.json", data, 0644) + } + continue + } if _, err := dst.Write(bytes); err != nil { return err } From fe9f999fb09aabb84cf60ebc3ae34fb0b0ea44e7 Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Thu, 4 Nov 2021 12:05:07 +0000 Subject: [PATCH 04/10] provide ability to read and upload cards to drone server. --- livelog/copy.go | 14 ------------- livelog/extractor/writer.go | 40 +++++++++++++++++++++++++++++++++++++ pipeline/runtime/execer.go | 18 +++++++++++------ pipeline/runtime/type.go | 3 --- pipeline/uploader.go | 5 ++--- pipeline/uploader/upload.go | 9 ++------- 6 files changed, 56 insertions(+), 33 deletions(-) create mode 100644 livelog/extractor/writer.go diff --git a/livelog/copy.go b/livelog/copy.go index 70b92e5..6e787bc 100644 --- a/livelog/copy.go +++ b/livelog/copy.go @@ -6,29 +6,15 @@ package livelog import ( "bufio" - "encoding/base64" "io" - "io/ioutil" - "regexp" ) -var re = regexp.MustCompile("#((.*?)#)") - // Copy copies from src to dst and removes until either EOF // is reached on src or an error occurs. func Copy(dst io.Writer, src io.ReadCloser) error { r := bufio.NewReader(src) for { bytes, err := r.ReadBytes('\n') - // check logs for card data - card := re.FindStringSubmatch(string(bytes)) - if card != nil { - data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) - if err == nil { - _ = ioutil.WriteFile("/tmp/card.json", data, 0644) - } - continue - } if _, err := dst.Write(bytes); err != nil { return err } diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go new file mode 100644 index 0000000..a7d2211 --- /dev/null +++ b/livelog/extractor/writer.go @@ -0,0 +1,40 @@ +package extractor + +import ( + "encoding/base64" + "io" + "regexp" +) + +var re = regexp.MustCompile("#((.*?)#)") + +type Writer struct { + base io.Writer + file []byte +} + +func New(w io.Writer) *Writer { + return &Writer{w, nil} +} + +func (e *Writer) Write(p []byte) (n int, err error) { + card := re.FindStringSubmatch(string(p)) + if card == nil { + return e.base.Write(p) + } + + data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) + if err == nil { + e.file = data + } + // remove encoded string for logs + return e.base.Write([]byte("")) +} + +func (e *Writer) File() ([]byte, bool) { + if len(e.file) > 0 { + return e.file, true + } else { + return nil, false + } +} diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index 4b3d28d..bcd5092 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -8,6 +8,8 @@ import ( "context" "sync" + "github.com/drone/runner-go/livelog/extractor" + "github.com/drone/drone-go/drone" "github.com/drone/runner-go/environ" "github.com/drone/runner-go/logger" @@ -235,17 +237,20 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste wc := e.streamer.Stream(noContext, state, step.GetName()) wc = newReplacer(wc, secretSlice(step)) + // wrap writer in extrator + ext := extractor.New(wc) + // if the step is configured as a daemon, it is detached // from the main process and executed separately. if step.IsDetached() { go func() { - e.engine.Run(ctx, spec, copy, wc) + e.engine.Run(ctx, spec, copy, ext) wc.Close() }() return nil } - exited, err := e.engine.Run(ctx, spec, copy, wc) + exited, err := e.engine.Run(ctx, spec, copy, ext) // close the stream. If the session is a remote session, the // full log buffer is uploaded to the remote server. @@ -253,14 +258,15 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste result = multierror.Append(result, err) } - // stream card data to server if exists - file, _ := e.engine.StreamFile(ctx, spec, copy, "/tmp/card.json") - if file != nil { - err = e.uploader.UploadCard(ctx, file, state, step.GetName()) + // upload card if exists + card, ok := ext.File() + if ok { + err = e.uploader.UploadCard(ctx, card, state, step.GetName()) if err != nil { return nil } } + // if the context was cancelled and returns a Canceled or // DeadlineExceeded error this indicates the pipeline was // cancelled. diff --git a/pipeline/runtime/type.go b/pipeline/runtime/type.go index 8ded0fa..38fa4de 100644 --- a/pipeline/runtime/type.go +++ b/pipeline/runtime/type.go @@ -84,9 +84,6 @@ type ( // Run runs the pipeline step. Run(context.Context, Spec, Step, io.Writer) (*State, error) - - // StreamFile copies a file to the server - StreamFile(context.Context, Spec, Step, string) (io.ReadCloser, error) } // Spec is an interface that must be implemented by all diff --git a/pipeline/uploader.go b/pipeline/uploader.go index d2a5df4..f80dc7c 100644 --- a/pipeline/uploader.go +++ b/pipeline/uploader.go @@ -2,11 +2,10 @@ package pipeline import ( "context" - "io" ) type Uploader interface { - UploadCard(context.Context, io.ReadCloser, *State, string) error + UploadCard(context.Context, []byte, *State, string) error } func NopUploader() Uploader { @@ -15,4 +14,4 @@ func NopUploader() Uploader { type nopUploader struct{} -func (*nopUploader) UploadCard(context.Context, io.ReadCloser, *State, string) error { return nil } +func (*nopUploader) UploadCard(context.Context, []byte, *State, string) error { return nil } diff --git a/pipeline/uploader/upload.go b/pipeline/uploader/upload.go index 3d1628e..55b69e4 100644 --- a/pipeline/uploader/upload.go +++ b/pipeline/uploader/upload.go @@ -3,7 +3,6 @@ package uploader import ( "context" "encoding/json" - "io" "github.com/drone/drone-go/drone" "github.com/drone/runner-go/client" @@ -23,14 +22,10 @@ func New(client client.Client) *Upload { } } -func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, state *pipeline.State, stepName string) error { +func (s *Upload) UploadCard(ctx context.Context, bytes []byte, state *pipeline.State, stepName string) error { src := state.Find(stepName) - bytes, err := io.ReadAll(r) - if err != nil { - return err - } card := drone.CardInput{} - err = json.Unmarshal(bytes, &card) + err := json.Unmarshal(bytes, &card) if err != nil { return err } From 0817a8dbf2245b0d14df4d42ce2888cc3b7294cd Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Thu, 4 Nov 2021 12:07:44 +0000 Subject: [PATCH 05/10] order imports --- pipeline/runtime/execer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index bcd5092..b4587b4 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -8,10 +8,9 @@ import ( "context" "sync" - "github.com/drone/runner-go/livelog/extractor" - "github.com/drone/drone-go/drone" "github.com/drone/runner-go/environ" + "github.com/drone/runner-go/livelog/extractor" "github.com/drone/runner-go/logger" "github.com/drone/runner-go/pipeline" From 39ece7e74a3019f4e00b61ff27bd18966ac945fd Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Thu, 4 Nov 2021 15:35:56 +0000 Subject: [PATCH 06/10] refactor change --- go.mod | 3 +-- go.sum | 2 ++ livelog/extractor/writer.go | 6 +++--- pipeline/runtime/execer.go | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index d83bba5..eacc27b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/buildkite/yaml v2.1.0+incompatible github.com/coreos/go-semver v0.3.0 github.com/docker/go-units v0.4.0 - github.com/drone/drone-go v1.6.0 + github.com/drone/drone-go v1.7.1 github.com/drone/envsubst v1.0.2 github.com/google/go-cmp v0.3.0 github.com/hashicorp/go-multierror v1.0.0 @@ -17,4 +17,3 @@ require ( golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) - diff --git a/go.sum b/go.sum index dbe4949..78a3e44 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/drone/drone-go v1.4.1-0.20201109202657-b9e58bbbcf27/go.mod h1:fxCf9jA github.com/drone/drone-go v1.5.0 h1:4rM74O3Xd6SnkdRIidlwwhVAPs4dXvcdVCgGvkrqL1M= github.com/drone/drone-go v1.5.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= github.com/drone/drone-go v1.6.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= +github.com/drone/drone-go v1.7.1 h1:ZX+3Rs8YHUSUQ5mkuMLmm1zr1ttiiE2YGNxF3AnyDKw= +github.com/drone/drone-go v1.7.1/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= github.com/drone/envsubst v1.0.2 h1:dpYLMAspQHW0a8dZpLRKe9jCNvIGZPhCPrycZzIHdqo= github.com/drone/envsubst v1.0.2/go.mod h1:bkZbnc/2vh1M12Ecn7EYScpI4YGYU0etwLJICOWi8Z0= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go index a7d2211..6170af6 100644 --- a/livelog/extractor/writer.go +++ b/livelog/extractor/writer.go @@ -1,6 +1,7 @@ package extractor import ( + "bytes" "encoding/base64" "io" "regexp" @@ -18,11 +19,10 @@ func New(w io.Writer) *Writer { } func (e *Writer) Write(p []byte) (n int, err error) { - card := re.FindStringSubmatch(string(p)) - if card == nil { + if bytes.HasPrefix(p, []byte("#")) == false { return e.base.Write(p) } - + card := re.FindStringSubmatch(string(p)) data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) if err == nil { e.file = data diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index b4587b4..26bc96b 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -262,7 +262,8 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste if ok { err = e.uploader.UploadCard(ctx, card, state, step.GetName()) if err != nil { - return nil + log.Warnln("cannot upload card") + result = multierror.Append(result, err) } } From 0c2b0a9c62a9e9bc0762f1fc477642d7de7a7e05 Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Mon, 8 Nov 2021 11:47:17 +0000 Subject: [PATCH 07/10] update logic to use ansi escape characters --- livelog/extractor/writer.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go index 6170af6..55780d6 100644 --- a/livelog/extractor/writer.go +++ b/livelog/extractor/writer.go @@ -7,7 +7,12 @@ import ( "regexp" ) -var re = regexp.MustCompile("#((.*?)#)") +const Esc = "\u001B" + +var ( + prefix = Esc + "]1338;" + re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)") +) type Writer struct { base io.Writer @@ -19,16 +24,18 @@ func New(w io.Writer) *Writer { } func (e *Writer) Write(p []byte) (n int, err error) { - if bytes.HasPrefix(p, []byte("#")) == false { + if bytes.HasPrefix(p, []byte(prefix)) == false { return e.base.Write(p) } card := re.FindStringSubmatch(string(p)) - data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) - if err == nil { - e.file = data + if len(card) != 0 { + data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) + if err == nil { + e.file = data + } + return e.base.Write([]byte("")) } - // remove encoded string for logs - return e.base.Write([]byte("")) + return e.base.Write(p) } func (e *Writer) File() ([]byte, bool) { From e6109dd869012d865c4604cd2e2b5c4b607fe5ea Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Tue, 9 Nov 2021 14:20:44 +0000 Subject: [PATCH 08/10] handle split logs when data stream is large --- livelog/extractor/writer.go | 46 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go index 55780d6..4817dcf 100644 --- a/livelog/extractor/writer.go +++ b/livelog/extractor/writer.go @@ -7,41 +7,49 @@ import ( "regexp" ) -const Esc = "\u001B" - var ( - prefix = Esc + "]1338;" + prefix = []byte("\u001B]1338;") + suffix = []byte("\u001B]0m") re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)") ) type Writer struct { - base io.Writer - file []byte + base io.Writer + file []byte + chunked bool } func New(w io.Writer) *Writer { - return &Writer{w, nil} + return &Writer{w, nil, false} } func (e *Writer) Write(p []byte) (n int, err error) { - if bytes.HasPrefix(p, []byte(prefix)) == false { + if bytes.HasPrefix(p, prefix) == false && e.chunked == false { return e.base.Write(p) } - card := re.FindStringSubmatch(string(p)) - if len(card) != 0 { - data, err := base64.StdEncoding.DecodeString(card[len(card)-1:][0]) - if err == nil { - e.file = data - } - return e.base.Write([]byte("")) - } - return e.base.Write(p) + + // if the data does not include the ansi suffix, + // it exceeds the size of the buffer and is chunked. + e.chunked = !bytes.Contains(p, suffix) + + // trim the ansi prefix and suffix from the data, + // and also trim any spacing or newlines that could + // cause confusion. + p = bytes.TrimSpace(p) + p = bytes.TrimPrefix(p, prefix) + p = bytes.TrimSuffix(p, suffix) + + e.file = append(e.file, p...) + return n, nil } func (e *Writer) File() ([]byte, bool) { - if len(e.file) > 0 { - return e.file, true - } else { + if len(e.file) == 0 { + return nil, false + } + data, err := base64.StdEncoding.DecodeString(string(e.file)) + if err != nil { return nil, false } + return data, true } From 00237bc5305ef716704ec8d37fb03a6bc9ba771b Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Tue, 9 Nov 2021 15:05:37 +0000 Subject: [PATCH 09/10] fixes issue with docker runner - short write error --- livelog/extractor/writer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go index 4817dcf..6a71ef8 100644 --- a/livelog/extractor/writer.go +++ b/livelog/extractor/writer.go @@ -27,6 +27,7 @@ func (e *Writer) Write(p []byte) (n int, err error) { if bytes.HasPrefix(p, prefix) == false && e.chunked == false { return e.base.Write(p) } + n = len(p) // if the data does not include the ansi suffix, // it exceeds the size of the buffer and is chunked. From 501cf9092d4ec9b9f8cf13ecabdfe961532d04e1 Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Tue, 9 Nov 2021 16:40:20 +0000 Subject: [PATCH 10/10] add feature flag to disable cards & check is valid json --- livelog/extractor/writer.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go index 6a71ef8..3903523 100644 --- a/livelog/extractor/writer.go +++ b/livelog/extractor/writer.go @@ -3,14 +3,17 @@ package extractor import ( "bytes" "encoding/base64" + "encoding/json" "io" + "os" "regexp" ) var ( - prefix = []byte("\u001B]1338;") - suffix = []byte("\u001B]0m") - re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)") + prefix = []byte("\u001B]1338;") + suffix = []byte("\u001B]0m") + re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)") + disableCards = os.Getenv("DRONE_FLAG_ENABLE_CARDS") == "false" ) type Writer struct { @@ -24,6 +27,9 @@ func New(w io.Writer) *Writer { } func (e *Writer) Write(p []byte) (n int, err error) { + if disableCards { + return e.base.Write(p) + } if bytes.HasPrefix(p, prefix) == false && e.chunked == false { return e.base.Write(p) } @@ -52,5 +58,13 @@ func (e *Writer) File() ([]byte, bool) { if err != nil { return nil, false } - return data, true + if isJSON(data) { + return data, true + } + return nil, false +} + +func isJSON(data []byte) bool { + var js json.RawMessage + return json.Unmarshal(data, &js) == nil }