|
|
|
@ -145,10 +145,24 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
|
|
|
|
|
// the semaphore limits the number of steps that can run
|
|
|
|
|
// concurrently. acquire the semaphore and release when
|
|
|
|
|
// the pipeline completes.
|
|
|
|
|
if err := e.sem.Acquire(ctx, 1); err != nil {
|
|
|
|
|
err := e.sem.Acquire(ctx, 1)
|
|
|
|
|
|
|
|
|
|
// if acquiring the semaphore failed because the context
|
|
|
|
|
// deadline exceeded (e.g. the pipeline timed out) the
|
|
|
|
|
// state should be canceled.
|
|
|
|
|
switch ctx.Err() {
|
|
|
|
|
case context.Canceled, context.DeadlineExceeded:
|
|
|
|
|
state.Cancel()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if acquiring the semaphore failed for unexpected reasons
|
|
|
|
|
// the pipeline should error.
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Errorln("failed to acquire semaphore.")
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
// recover from a panic to ensure the semaphore is
|
|
|
|
|
// released to prevent deadlock. we do not expect a
|
|
|
|
|