diff --git a/client/client.go b/client/client.go index 39c8e63..e673d20 100644 --- a/client/client.go +++ b/client/client.go @@ -89,4 +89,7 @@ type Client interface { // Upload uploads the full logs to the server. Upload(ctx context.Context, step int64, lines []*drone.Line) error + + // UploadCard uploads a card to drone server. + UploadCard(ctx context.Context, step int64, card *drone.CardInput) error } diff --git a/client/http.go b/client/http.go index d7d72a2..81a5f3d 100644 --- a/client/http.go +++ b/client/http.go @@ -30,6 +30,7 @@ const ( endpointWatch = "/rpc/v2/build/%d/watch" endpointBatch = "/rpc/v2/step/%d/logs/batch" endpointUpload = "/rpc/v2/step/%d/logs/upload" + endpointCard = "/rpc/v2/step/%d/card" ) var _ Client = (*HTTPClient)(nil) @@ -201,6 +202,13 @@ func (p *HTTPClient) Upload(ctx context.Context, step int64, lines []*drone.Line return err } +// UploadCard uploads a card to drone server. +func (p *HTTPClient) UploadCard(ctx context.Context, step int64, card *drone.CardInput) error { + uri := fmt.Sprintf(endpointCard, step) + _, err := p.retry(ctx, uri, "POST", &card, nil) + return err +} + func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) { for { res, err := p.do(ctx, path, method, in, out) diff --git a/go.mod b/go.mod index d83bba5..eacc27b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/buildkite/yaml v2.1.0+incompatible github.com/coreos/go-semver v0.3.0 github.com/docker/go-units v0.4.0 - github.com/drone/drone-go v1.6.0 + github.com/drone/drone-go v1.7.1 github.com/drone/envsubst v1.0.2 github.com/google/go-cmp v0.3.0 github.com/hashicorp/go-multierror v1.0.0 @@ -17,4 +17,3 @@ require ( golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) - diff --git a/go.sum b/go.sum index dbe4949..78a3e44 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/drone/drone-go v1.4.1-0.20201109202657-b9e58bbbcf27/go.mod h1:fxCf9jA github.com/drone/drone-go v1.5.0 h1:4rM74O3Xd6SnkdRIidlwwhVAPs4dXvcdVCgGvkrqL1M= github.com/drone/drone-go v1.5.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= github.com/drone/drone-go v1.6.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= +github.com/drone/drone-go v1.7.1 h1:ZX+3Rs8YHUSUQ5mkuMLmm1zr1ttiiE2YGNxF3AnyDKw= +github.com/drone/drone-go v1.7.1/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg= github.com/drone/envsubst v1.0.2 h1:dpYLMAspQHW0a8dZpLRKe9jCNvIGZPhCPrycZzIHdqo= github.com/drone/envsubst v1.0.2/go.mod h1:bkZbnc/2vh1M12Ecn7EYScpI4YGYU0etwLJICOWi8Z0= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/internal/merge.go b/internal/merge.go index 98a6722..08a12f9 100644 --- a/internal/merge.go +++ b/internal/merge.go @@ -25,4 +25,5 @@ func MergeStep(src, dst *drone.Step) { dst.Started = src.Started dst.Stopped = src.Stopped dst.Version = src.Version + dst.Schema = src.Schema } diff --git a/livelog/extractor/writer.go b/livelog/extractor/writer.go new file mode 100644 index 0000000..3903523 --- /dev/null +++ b/livelog/extractor/writer.go @@ -0,0 +1,70 @@ +package extractor + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "io" + "os" + "regexp" +) + +var ( + prefix = []byte("\u001B]1338;") + suffix = []byte("\u001B]0m") + re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)") + disableCards = os.Getenv("DRONE_FLAG_ENABLE_CARDS") == "false" +) + +type Writer struct { + base io.Writer + file []byte + chunked bool +} + +func New(w io.Writer) *Writer { + return &Writer{w, nil, false} +} + +func (e *Writer) Write(p []byte) (n int, err error) { + if disableCards { + return e.base.Write(p) + } + if bytes.HasPrefix(p, prefix) == false && e.chunked == false { + return e.base.Write(p) + } + n = len(p) + + // if the data does not include the ansi suffix, + // it exceeds the size of the buffer and is chunked. + e.chunked = !bytes.Contains(p, suffix) + + // trim the ansi prefix and suffix from the data, + // and also trim any spacing or newlines that could + // cause confusion. + p = bytes.TrimSpace(p) + p = bytes.TrimPrefix(p, prefix) + p = bytes.TrimSuffix(p, suffix) + + e.file = append(e.file, p...) + return n, nil +} + +func (e *Writer) File() ([]byte, bool) { + if len(e.file) == 0 { + return nil, false + } + data, err := base64.StdEncoding.DecodeString(string(e.file)) + if err != nil { + return nil, false + } + if isJSON(data) { + return data, true + } + return nil, false +} + +func isJSON(data []byte) bool { + var js json.RawMessage + return json.Unmarshal(data, &js) == nil +} diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index 37a1172..26bc96b 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -10,6 +10,7 @@ import ( "github.com/drone/drone-go/drone" "github.com/drone/runner-go/environ" + "github.com/drone/runner-go/livelog/extractor" "github.com/drone/runner-go/logger" "github.com/drone/runner-go/pipeline" @@ -24,6 +25,7 @@ type Execer struct { engine Engine reporter pipeline.Reporter streamer pipeline.Streamer + uploader pipeline.Uploader sem *semaphore.Weighted } @@ -31,6 +33,7 @@ type Execer struct { func NewExecer( reporter pipeline.Reporter, streamer pipeline.Streamer, + uploader pipeline.Uploader, engine Engine, threads int64, ) *Execer { @@ -38,6 +41,7 @@ func NewExecer( reporter: reporter, streamer: streamer, engine: engine, + uploader: uploader, } if threads > 0 { // optional semaphore that limits the number of steps @@ -232,17 +236,20 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste wc := e.streamer.Stream(noContext, state, step.GetName()) wc = newReplacer(wc, secretSlice(step)) + // wrap writer in extrator + ext := extractor.New(wc) + // if the step is configured as a daemon, it is detached // from the main process and executed separately. if step.IsDetached() { go func() { - e.engine.Run(ctx, spec, copy, wc) + e.engine.Run(ctx, spec, copy, ext) wc.Close() }() return nil } - exited, err := e.engine.Run(ctx, spec, copy, wc) + exited, err := e.engine.Run(ctx, spec, copy, ext) // close the stream. If the session is a remote session, the // full log buffer is uploaded to the remote server. @@ -250,6 +257,16 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste result = multierror.Append(result, err) } + // upload card if exists + card, ok := ext.File() + if ok { + err = e.uploader.UploadCard(ctx, card, state, step.GetName()) + if err != nil { + log.Warnln("cannot upload card") + result = multierror.Append(result, err) + } + } + // if the context was cancelled and returns a Canceled or // DeadlineExceeded error this indicates the pipeline was // cancelled. diff --git a/pipeline/uploader.go b/pipeline/uploader.go new file mode 100644 index 0000000..f80dc7c --- /dev/null +++ b/pipeline/uploader.go @@ -0,0 +1,17 @@ +package pipeline + +import ( + "context" +) + +type Uploader interface { + UploadCard(context.Context, []byte, *State, string) error +} + +func NopUploader() Uploader { + return new(nopUploader) +} + +type nopUploader struct{} + +func (*nopUploader) UploadCard(context.Context, []byte, *State, string) error { return nil } diff --git a/pipeline/uploader/upload.go b/pipeline/uploader/upload.go new file mode 100644 index 0000000..55b69e4 --- /dev/null +++ b/pipeline/uploader/upload.go @@ -0,0 +1,48 @@ +package uploader + +import ( + "context" + "encoding/json" + + "github.com/drone/drone-go/drone" + "github.com/drone/runner-go/client" + "github.com/drone/runner-go/internal" + "github.com/drone/runner-go/pipeline" +) + +var _ pipeline.Uploader = (*Upload)(nil) + +type Upload struct { + client client.Client +} + +func New(client client.Client) *Upload { + return &Upload{ + client: client, + } +} + +func (s *Upload) UploadCard(ctx context.Context, bytes []byte, state *pipeline.State, stepName string) error { + src := state.Find(stepName) + card := drone.CardInput{} + err := json.Unmarshal(bytes, &card) + if err != nil { + return err + } + err = s.client.UploadCard(ctx, src.ID, &card) + if err != nil { + return err + } + // update step schema + state.Lock() + src.Schema = card.Schema + cpy := internal.CloneStep(src) + state.Unlock() + err = s.client.UpdateStep(ctx, cpy) + if err == nil { + state.Lock() + internal.MergeStep(cpy, src) + state.Unlock() + } + return nil +} diff --git a/pipeline/uploader/upload_test.go b/pipeline/uploader/upload_test.go new file mode 100644 index 0000000..01aafa6 --- /dev/null +++ b/pipeline/uploader/upload_test.go @@ -0,0 +1 @@ +package uploader diff --git a/pipeline/uploader_test.go b/pipeline/uploader_test.go new file mode 100644 index 0000000..fb2071c --- /dev/null +++ b/pipeline/uploader_test.go @@ -0,0 +1 @@ +package pipeline