Merge pull request #16 from drone/feature/dron-101-card-2

read & upload card data to drone server
pull/17/head
Eoin McAfee 3 years ago committed by GitHub
commit 86c6a79bd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -89,4 +89,7 @@ type Client interface {
// Upload uploads the full logs to the server.
Upload(ctx context.Context, step int64, lines []*drone.Line) error
// UploadCard uploads a card to drone server.
UploadCard(ctx context.Context, step int64, card *drone.CardInput) error
}

@ -30,6 +30,7 @@ const (
endpointWatch = "/rpc/v2/build/%d/watch"
endpointBatch = "/rpc/v2/step/%d/logs/batch"
endpointUpload = "/rpc/v2/step/%d/logs/upload"
endpointCard = "/rpc/v2/step/%d/card"
)
var _ Client = (*HTTPClient)(nil)
@ -201,6 +202,13 @@ func (p *HTTPClient) Upload(ctx context.Context, step int64, lines []*drone.Line
return err
}
// UploadCard uploads a card to drone server.
func (p *HTTPClient) UploadCard(ctx context.Context, step int64, card *drone.CardInput) error {
uri := fmt.Sprintf(endpointCard, step)
_, err := p.retry(ctx, uri, "POST", &card, nil)
return err
}
func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) {
for {
res, err := p.do(ctx, path, method, in, out)

@ -8,7 +8,7 @@ require (
github.com/buildkite/yaml v2.1.0+incompatible
github.com/coreos/go-semver v0.3.0
github.com/docker/go-units v0.4.0
github.com/drone/drone-go v1.6.0
github.com/drone/drone-go v1.7.1
github.com/drone/envsubst v1.0.2
github.com/google/go-cmp v0.3.0
github.com/hashicorp/go-multierror v1.0.0
@ -17,4 +17,3 @@ require (
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)

@ -23,6 +23,8 @@ github.com/drone/drone-go v1.4.1-0.20201109202657-b9e58bbbcf27/go.mod h1:fxCf9jA
github.com/drone/drone-go v1.5.0 h1:4rM74O3Xd6SnkdRIidlwwhVAPs4dXvcdVCgGvkrqL1M=
github.com/drone/drone-go v1.5.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg=
github.com/drone/drone-go v1.6.0/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg=
github.com/drone/drone-go v1.7.1 h1:ZX+3Rs8YHUSUQ5mkuMLmm1zr1ttiiE2YGNxF3AnyDKw=
github.com/drone/drone-go v1.7.1/go.mod h1:fxCf9jAnXDZV1yDr0ckTuWd1intvcQwfJmTRpTZ1mXg=
github.com/drone/envsubst v1.0.2 h1:dpYLMAspQHW0a8dZpLRKe9jCNvIGZPhCPrycZzIHdqo=
github.com/drone/envsubst v1.0.2/go.mod h1:bkZbnc/2vh1M12Ecn7EYScpI4YGYU0etwLJICOWi8Z0=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=

@ -25,4 +25,5 @@ func MergeStep(src, dst *drone.Step) {
dst.Started = src.Started
dst.Stopped = src.Stopped
dst.Version = src.Version
dst.Schema = src.Schema
}

@ -0,0 +1,70 @@
package extractor
import (
"bytes"
"encoding/base64"
"encoding/json"
"io"
"os"
"regexp"
)
var (
prefix = []byte("\u001B]1338;")
suffix = []byte("\u001B]0m")
re = regexp.MustCompilePOSIX("\u001B]1338;((.*?)\u001B]0m)")
disableCards = os.Getenv("DRONE_FLAG_ENABLE_CARDS") == "false"
)
type Writer struct {
base io.Writer
file []byte
chunked bool
}
func New(w io.Writer) *Writer {
return &Writer{w, nil, false}
}
func (e *Writer) Write(p []byte) (n int, err error) {
if disableCards {
return e.base.Write(p)
}
if bytes.HasPrefix(p, prefix) == false && e.chunked == false {
return e.base.Write(p)
}
n = len(p)
// if the data does not include the ansi suffix,
// it exceeds the size of the buffer and is chunked.
e.chunked = !bytes.Contains(p, suffix)
// trim the ansi prefix and suffix from the data,
// and also trim any spacing or newlines that could
// cause confusion.
p = bytes.TrimSpace(p)
p = bytes.TrimPrefix(p, prefix)
p = bytes.TrimSuffix(p, suffix)
e.file = append(e.file, p...)
return n, nil
}
func (e *Writer) File() ([]byte, bool) {
if len(e.file) == 0 {
return nil, false
}
data, err := base64.StdEncoding.DecodeString(string(e.file))
if err != nil {
return nil, false
}
if isJSON(data) {
return data, true
}
return nil, false
}
func isJSON(data []byte) bool {
var js json.RawMessage
return json.Unmarshal(data, &js) == nil
}

@ -10,6 +10,7 @@ import (
"github.com/drone/drone-go/drone"
"github.com/drone/runner-go/environ"
"github.com/drone/runner-go/livelog/extractor"
"github.com/drone/runner-go/logger"
"github.com/drone/runner-go/pipeline"
@ -24,6 +25,7 @@ type Execer struct {
engine Engine
reporter pipeline.Reporter
streamer pipeline.Streamer
uploader pipeline.Uploader
sem *semaphore.Weighted
}
@ -31,6 +33,7 @@ type Execer struct {
func NewExecer(
reporter pipeline.Reporter,
streamer pipeline.Streamer,
uploader pipeline.Uploader,
engine Engine,
threads int64,
) *Execer {
@ -38,6 +41,7 @@ func NewExecer(
reporter: reporter,
streamer: streamer,
engine: engine,
uploader: uploader,
}
if threads > 0 {
// optional semaphore that limits the number of steps
@ -232,17 +236,20 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
wc := e.streamer.Stream(noContext, state, step.GetName())
wc = newReplacer(wc, secretSlice(step))
// wrap writer in extrator
ext := extractor.New(wc)
// if the step is configured as a daemon, it is detached
// from the main process and executed separately.
if step.IsDetached() {
go func() {
e.engine.Run(ctx, spec, copy, wc)
e.engine.Run(ctx, spec, copy, ext)
wc.Close()
}()
return nil
}
exited, err := e.engine.Run(ctx, spec, copy, wc)
exited, err := e.engine.Run(ctx, spec, copy, ext)
// close the stream. If the session is a remote session, the
// full log buffer is uploaded to the remote server.
@ -250,6 +257,16 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
result = multierror.Append(result, err)
}
// upload card if exists
card, ok := ext.File()
if ok {
err = e.uploader.UploadCard(ctx, card, state, step.GetName())
if err != nil {
log.Warnln("cannot upload card")
result = multierror.Append(result, err)
}
}
// if the context was cancelled and returns a Canceled or
// DeadlineExceeded error this indicates the pipeline was
// cancelled.

@ -0,0 +1,17 @@
package pipeline
import (
"context"
)
type Uploader interface {
UploadCard(context.Context, []byte, *State, string) error
}
func NopUploader() Uploader {
return new(nopUploader)
}
type nopUploader struct{}
func (*nopUploader) UploadCard(context.Context, []byte, *State, string) error { return nil }

@ -0,0 +1,48 @@
package uploader
import (
"context"
"encoding/json"
"github.com/drone/drone-go/drone"
"github.com/drone/runner-go/client"
"github.com/drone/runner-go/internal"
"github.com/drone/runner-go/pipeline"
)
var _ pipeline.Uploader = (*Upload)(nil)
type Upload struct {
client client.Client
}
func New(client client.Client) *Upload {
return &Upload{
client: client,
}
}
func (s *Upload) UploadCard(ctx context.Context, bytes []byte, state *pipeline.State, stepName string) error {
src := state.Find(stepName)
card := drone.CardInput{}
err := json.Unmarshal(bytes, &card)
if err != nil {
return err
}
err = s.client.UploadCard(ctx, src.ID, &card)
if err != nil {
return err
}
// update step schema
state.Lock()
src.Schema = card.Schema
cpy := internal.CloneStep(src)
state.Unlock()
err = s.client.UpdateStep(ctx, cpy)
if err == nil {
state.Lock()
internal.MergeStep(cpy, src)
state.Unlock()
}
return nil
}

@ -0,0 +1 @@
package pipeline
Loading…
Cancel
Save