add single flight queue polling
parent
c23a30094d
commit
e561a1a456
@ -0,0 +1,52 @@
|
||||
// Copyright 2019 Drone.IO Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform License
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
|
||||
"github.com/drone/drone-go/drone"
|
||||
)
|
||||
|
||||
var _ Client = (*SingleFlight)(nil)
|
||||
|
||||
// SingleFlight wraps a Client and limits to a single in-flight
|
||||
// request to pull items from the queue.
|
||||
type SingleFlight struct {
|
||||
Client
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewSingleFlight returns a Client that is limited to a single in-flight
|
||||
// request to pull items from the queue.
|
||||
func NewSingleFlight(endpoint, secret string, skipverify bool) *SingleFlight {
|
||||
return &SingleFlight{Client: New(endpoint, secret, skipverify)}
|
||||
}
|
||||
|
||||
// Request requests the next available build stage for execution.
|
||||
func (t *SingleFlight) Request(ctx context.Context, args *Filter) (*drone.Stage, error) {
|
||||
// if the context is canceled there is no need to make
|
||||
// the request and we can exit early.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
// if is critical to unlock the mutex when the function
|
||||
// exits. although a panic is unlikely it is critical that
|
||||
// we recover from the panic to avoid deadlock.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
debug.PrintStack()
|
||||
}
|
||||
t.mu.Unlock()
|
||||
}()
|
||||
// lock the mutex to ensure only a single in-flight
|
||||
// request to request a resource from the server queue.
|
||||
t.mu.Lock()
|
||||
return t.Client.Request(ctx, args)
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
// Copyright 2019 Drone.IO Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform License
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/drone/drone-go/drone"
|
||||
)
|
||||
|
||||
var noContext = context.Background()
|
||||
|
||||
func TestSingleFlight(t *testing.T) {
|
||||
mock := &mockRequestClient{
|
||||
out: &drone.Stage{},
|
||||
err: errors.New("some random error"),
|
||||
}
|
||||
client := NewSingleFlight("", "", false)
|
||||
client.Client = mock
|
||||
out, err := client.Request(noContext, nil)
|
||||
if got, want := out, mock.out; got != want {
|
||||
t.Errorf("Expect stage returned from request")
|
||||
}
|
||||
if got, want := err, mock.err; got != want {
|
||||
t.Errorf("Expect error returned from request")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleFlightPanic(t *testing.T) {
|
||||
mock := &mockRequestClientPanic{}
|
||||
client := NewSingleFlight("", "", false)
|
||||
client.Client = mock
|
||||
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
t.Errorf("Expect Request to recover from panic")
|
||||
}
|
||||
client.mu.Lock()
|
||||
client.mu.Unlock()
|
||||
}()
|
||||
|
||||
client.Request(noContext, nil)
|
||||
}
|
||||
|
||||
func TestSingleFlightCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(noContext)
|
||||
cancel()
|
||||
client := NewSingleFlight("", "", false)
|
||||
client.Request(ctx, nil)
|
||||
}
|
||||
|
||||
// mock client that returns a static stage and error
|
||||
// from the request method.
|
||||
type mockRequestClient struct {
|
||||
Client
|
||||
|
||||
out *drone.Stage
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockRequestClient) Request(ctx context.Context, args *Filter) (*drone.Stage, error) {
|
||||
return m.out, m.err
|
||||
}
|
||||
|
||||
// mock client that returns panics when the request
|
||||
// method is invoked.
|
||||
type mockRequestClientPanic struct {
|
||||
Client
|
||||
}
|
||||
|
||||
func (m *mockRequestClientPanic) Request(ctx context.Context, args *Filter) (*drone.Stage, error) {
|
||||
panic("method not implemented")
|
||||
}
|
Loading…
Reference in New Issue