|
|
|
@ -142,6 +142,8 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
|
|
|
|
|
ctx = logger.WithContext(ctx, log)
|
|
|
|
|
|
|
|
|
|
if e.sem != nil {
|
|
|
|
|
log.Trace("acquiring semaphore")
|
|
|
|
|
|
|
|
|
|
// the semaphore limits the number of steps that can run
|
|
|
|
|
// concurrently. acquire the semaphore and release when
|
|
|
|
|
// the pipeline completes.
|
|
|
|
@ -152,6 +154,7 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
|
|
|
|
|
// state should be canceled.
|
|
|
|
|
switch ctx.Err() {
|
|
|
|
|
case context.Canceled, context.DeadlineExceeded:
|
|
|
|
|
log.Trace("acquiring semaphore canceled")
|
|
|
|
|
state.Cancel()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -172,6 +175,7 @@ func (e *Execer) exec(ctx context.Context, state *pipeline.State, spec Spec, ste
|
|
|
|
|
}
|
|
|
|
|
// release the semaphore
|
|
|
|
|
e.sem.Release(1)
|
|
|
|
|
log.Trace("semaphore released")
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|