|
|
|
@ -57,13 +57,33 @@ func (e *Execer) Exec(ctx context.Context, spec Spec, state *pipeline.State) err
|
|
|
|
|
return e.reporter.ReportStage(noContext, state)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// create a new context with cancel in order to
|
|
|
|
|
// support fail failure when a step fails.
|
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
// create a directed graph, where each vertex in the graph
|
|
|
|
|
// is a pipeline step.
|
|
|
|
|
var d dag.Runner
|
|
|
|
|
for i := 0; i < spec.StepLen(); i++ {
|
|
|
|
|
step := spec.StepAt(i)
|
|
|
|
|
d.AddVertex(step.GetName(), func() error {
|
|
|
|
|
return e.exec(ctx, state, spec, step)
|
|
|
|
|
err := e.exec(ctx, state, spec, step)
|
|
|
|
|
// if the step is configured to fast fail the
|
|
|
|
|
// pipeline, and if the step returned a non-zero
|
|
|
|
|
// exit code, cancel the entire pipeline.
|
|
|
|
|
if step.GetErrPolicy() == ErrFailFast {
|
|
|
|
|
step := state.Find(step.GetName())
|
|
|
|
|
// reading data from the step is not thread
|
|
|
|
|
// safe so we need to acquire a lock.
|
|
|
|
|
state.Lock()
|
|
|
|
|
exit := step.ExitCode
|
|
|
|
|
state.Unlock()
|
|
|
|
|
if exit > 0 {
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|