add uploader to support streaming a card to the server

pull/16/head
Eoin McAfee 3 years ago
parent f1ca9ee103
commit 1658440152

@ -89,4 +89,7 @@ type Client interface {
// Upload uploads the full logs to the server.
Upload(ctx context.Context, step int64, lines []*drone.Line) error
// UploadCard uploads a card to drone server.
UploadCard(ctx context.Context, step int64, card *drone.CardInput) error
}

@ -30,6 +30,7 @@ const (
endpointWatch = "/rpc/v2/build/%d/watch"
endpointBatch = "/rpc/v2/step/%d/logs/batch"
endpointUpload = "/rpc/v2/step/%d/logs/upload"
endpointCard = "/rpc/v2/step/%d/card"
)
var _ Client = (*HTTPClient)(nil)
@ -201,6 +202,13 @@ func (p *HTTPClient) Upload(ctx context.Context, step int64, lines []*drone.Line
return err
}
// UploadCard uploads a card to drone server.
func (p *HTTPClient) UploadCard(ctx context.Context, step int64, card *drone.CardInput) error {
uri := fmt.Sprintf(endpointCard, step)
_, err := p.retry(ctx, uri, "POST", &card, nil)
return err
}
func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) {
for {
res, err := p.do(ctx, path, method, in, out)

@ -24,6 +24,7 @@ type Execer struct {
engine Engine
reporter pipeline.Reporter
streamer pipeline.Streamer
uploader pipeline.Uploader
sem *semaphore.Weighted
}
@ -31,6 +32,7 @@ type Execer struct {
func NewExecer(
reporter pipeline.Reporter,
streamer pipeline.Streamer,
uploader pipeline.Uploader,
engine Engine,
threads int64,
) *Execer {
@ -38,6 +40,7 @@ func NewExecer(
reporter: reporter,
streamer: streamer,
engine: engine,
uploader: uploader,
}
if threads > 0 {
// optional semaphore that limits the number of steps
@ -250,6 +253,15 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
result = multierror.Append(result, err)
}
// stream card data to server if exists
file, _ := e.engine.StreamFile(ctx, copy, "/tmp/card.json")
if file != nil {
s := state.Find(step.GetName())
err = e.uploader.UploadCard(ctx, file, s.ID)
if err != nil {
return nil
}
}
// if the context was cancelled and returns a Canceled or
// DeadlineExceeded error this indicates the pipeline was
// cancelled.

@ -84,6 +84,9 @@ type (
// Run runs the pipeline step.
Run(context.Context, Spec, Step, io.Writer) (*State, error)
// StreamFile copies a file to the server
StreamFile(context.Context, Step, string) (io.ReadCloser, error)
}
// Spec is an interface that must be implemented by all

@ -0,0 +1,18 @@
package pipeline
import (
"context"
"io"
)
type Uploader interface {
UploadCard(ctx context.Context, r io.ReadCloser, step int64) error
}
func NopUploader() Uploader {
return new(nopUploader)
}
type nopUploader struct{}
func (*nopUploader) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error { return nil }

@ -0,0 +1,40 @@
package uploader
import (
"context"
"encoding/json"
"io"
"github.com/drone/drone-go/drone"
"github.com/drone/runner-go/client"
"github.com/drone/runner-go/pipeline"
)
var _ pipeline.Uploader = (*Upload)(nil)
type Upload struct {
client client.Client
}
func New(client client.Client) *Upload {
return &Upload{
client: client,
}
}
func (s *Upload) UploadCard(ctx context.Context, r io.ReadCloser, step int64) error {
bytes, err := io.ReadAll(r)
if err != nil {
return err
}
card := drone.CardInput{}
err = json.Unmarshal(bytes, &card)
if err != nil {
return err
}
err = s.client.UploadCard(ctx, step, &card)
if err != nil {
return err
}
return nil
}

@ -0,0 +1 @@
package pipeline
Loading…
Cancel
Save