From 1658440152c83c8d3c73f357fb41d5393633bd9b Mon Sep 17 00:00:00 2001 From: Eoin McAfee Date: Fri, 22 Oct 2021 13:49:46 +0100 Subject: [PATCH] add uploader to support streaming a card to the server --- client/client.go | 3 +++ client/http.go | 8 +++++++ pipeline/runtime/execer.go | 12 ++++++++++ pipeline/runtime/type.go | 3 +++ pipeline/uploader.go | 18 ++++++++++++++ pipeline/uploader/upload.go | 40 ++++++++++++++++++++++++++++++++ pipeline/uploader/upload_test.go | 1 + pipeline/uploader_test.go | 1 + 8 files changed, 86 insertions(+) create mode 100644 pipeline/uploader.go create mode 100644 pipeline/uploader/upload.go create mode 100644 pipeline/uploader/upload_test.go create mode 100644 pipeline/uploader_test.go diff --git a/client/client.go b/client/client.go index 39c8e63..e673d20 100644 --- a/client/client.go +++ b/client/client.go @@ -89,4 +89,7 @@ type Client interface { // Upload uploads the full logs to the server. Upload(ctx context.Context, step int64, lines []*drone.Line) error + + // UploadCard uploads a card to drone server. + UploadCard(ctx context.Context, step int64, card *drone.CardInput) error } diff --git a/client/http.go b/client/http.go index ea4fcb8..06f196a 100644 --- a/client/http.go +++ b/client/http.go @@ -30,6 +30,7 @@ const ( endpointWatch = "/rpc/v2/build/%d/watch" endpointBatch = "/rpc/v2/step/%d/logs/batch" endpointUpload = "/rpc/v2/step/%d/logs/upload" + endpointCard = "/rpc/v2/step/%d/card" ) var _ Client = (*HTTPClient)(nil) @@ -201,6 +202,13 @@ func (p *HTTPClient) Upload(ctx context.Context, step int64, lines []*drone.Line return err } +// UploadCard uploads a card to drone server. +func (p *HTTPClient) UploadCard(ctx context.Context, step int64, card *drone.CardInput) error { + uri := fmt.Sprintf(endpointCard, step) + _, err := p.retry(ctx, uri, "POST", &card, nil) + return err +} + func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) { for { res, err := p.do(ctx, path, method, in, out) diff --git a/pipeline/runtime/execer.go b/pipeline/runtime/execer.go index 37a1172..5fdc861 100644 --- a/pipeline/runtime/execer.go +++ b/pipeline/runtime/execer.go @@ -24,6 +24,7 @@ type Execer struct { engine Engine reporter pipeline.Reporter streamer pipeline.Streamer + uploader pipeline.Uploader sem *semaphore.Weighted } @@ -31,6 +32,7 @@ type Execer struct { func NewExecer( reporter pipeline.Reporter, streamer pipeline.Streamer, + uploader pipeline.Uploader, engine Engine, threads int64, ) *Execer { @@ -38,6 +40,7 @@ func NewExecer( reporter: reporter, streamer: streamer, engine: engine, + uploader: uploader, } if threads > 0 { // optional semaphore that limits the number of steps @@ -250,6 +253,15 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste result = multierror.Append(result, err) } + // stream card data to server if exists + file, _ := e.engine.StreamFile(ctx, copy, "/tmp/card.json") + if file != nil { + s := state.Find(step.GetName()) + err = e.uploader.UploadCard(ctx, file, s.ID) + if err != nil { + return nil + } + } // if the context was cancelled and returns a Canceled or // DeadlineExceeded error this indicates the pipeline was // cancelled. diff --git a/pipeline/runtime/type.go b/pipeline/runtime/type.go index 38fa4de..3aa66d0 100644 --- a/pipeline/runtime/type.go +++ b/pipeline/runtime/type.go @@ -84,6 +84,9 @@ type ( // Run runs the pipeline step. Run(context.Context, Spec, Step, io.Writer) (*State, error) + + // StreamFile copies a file to the server + StreamFile(context.Context, Step, string) (io.ReadCloser, error) } // Spec is an interface that must be implemented by all diff --git a/pipeline/uploader.go b/pipeline/uploader.go new file mode 100644 index 0000000..543928a --- /dev/null +++ b/pipeline/uploader.go @@ -0,0 +1,18 @@ +package pipeline + +import ( + "context" + "io" +) + +type Uploader interface { + UploadCard(ctx context.Context, r io.ReadCloser, step int64) error +} + +func NopUploader() Uploader { + return new(nopUploader) +} + +type nopUploader struct{} + +func (*nopUploader) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { return nil } diff --git a/pipeline/uploader/upload.go b/pipeline/uploader/upload.go new file mode 100644 index 0000000..2feb3b8 --- /dev/null +++ b/pipeline/uploader/upload.go @@ -0,0 +1,40 @@ +package uploader + +import ( + "context" + "encoding/json" + "io" + + "github.com/drone/drone-go/drone" + "github.com/drone/runner-go/client" + "github.com/drone/runner-go/pipeline" +) + +var _ pipeline.Uploader = (*Upload)(nil) + +type Upload struct { + client client.Client +} + +func New(client client.Client) *Upload { + return &Upload{ + client: client, + } +} + +func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { + bytes, err := io.ReadAll(r) + if err != nil { + return err + } + card := drone.CardInput{} + err = json.Unmarshal(bytes, &card) + if err != nil { + return err + } + err = s.client.UploadCard(ctx, step, &card) + if err != nil { + return err + } + return nil +} diff --git a/pipeline/uploader/upload_test.go b/pipeline/uploader/upload_test.go new file mode 100644 index 0000000..01aafa6 --- /dev/null +++ b/pipeline/uploader/upload_test.go @@ -0,0 +1 @@ +package uploader diff --git a/pipeline/uploader_test.go b/pipeline/uploader_test.go new file mode 100644 index 0000000..fb2071c --- /dev/null +++ b/pipeline/uploader_test.go @@ -0,0 +1 @@ +package pipeline