-
Notifications
You must be signed in to change notification settings - Fork 1
/
flo.go
384 lines (338 loc) · 11.7 KB
/
flo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
// Package flo is designed to be an abstraction to the common worker pool pattern used in Go. How does it work? By being
// decently opinionated and making heavy use of reflection.
//
// The first thing you need to know about designing a flo is what kind of functions/methods it can work with. Flo's can
// consist of one of three function signatures:
//
// 1. func (context.Context) (R, error)
// 2. func (context.Context, T) (R, error)
// 3. func (context.Context, T) error
//
// One can only be used as the first step of a flo. It is meant to act as a step the produces data without an input from
// anywhere. Two can be used at any position in the flo, although if it is used as the first or last step in the flo
// some extra configuration is expected. Three can only be used as the last step of a flo. It is mean to act as a step
// that consumes data and does not send it along to anywhere else.
//
// Now lets break down the common parts of the step signatures. They all take in a context as their first parameter.
// This the same context that is passed into the flo when BuildAndExecute is called. It is propagated throughout to
// support proper context cancellation. The second things all of the signatures have in common is they all return at
// least an error.
//
// Important: If an error is returned from a step in the flo no result will not be propagated to the next step, ending
// any data processing for that data stream.
//
// Lastly, you see that steps can optionally take in a T and optionally return an R. Theses are meant to be generic
// placeholders for concrete types.
//
// Imagine we have have the following steps:
// func step1(ctx context.Context) (int, error) {...}
// func step2(ctx context.Context, i int) (string, error) {...}
// func step3(ctx context.Context, s string) error {...}
//
// We could construct and run a flo with these steps by doing:
// err := flo.NewBuilder().
// Add(step1).
// Add(step2).
// Add(step3).
// BuildAndExecute(context.Background())
//
// This would be an example of a valid flo. Notice how the output type of the previous step must match the input of the
// proceeding step(the T and R values mentioned above). If the types did not align properly the err that is returned
// would not be nil. The flo validates all types before it begins to process any data. A Validate method is also exposed
// from the flo.Builder should you want to validate your flo at test time.
//
// For more detailed examples including how to configure a flo's parallelism and how to bridge a flo to other parts of
// your codebase I recommend checkout out the examples folder in this repo.
package flo
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
)
var (
errStepCnt = errors.New("must register at least two steps")
errFirstStep = errors.New("first step must have a signature of func(context.Context) (R, error) or func(context.Context, T) (R, error)")
errInteriorStep = errors.New("interior step must have a signature of func(context.Context, T) (R, error)")
errLastStep = errors.New("last step must have a signature of func(context.Context, T) error")
errStepType = errors.New("a Step must be func with one of the following signatures: func(context.Context) (R, error), func(context.Context, T) (R, error), or func(context.Context, T) error")
errInputChType = errors.New("a input channel must be of type <-chan T")
errInputChStepType = errors.New("a input channel should only be registered when first step is of type func(context.Context, T) (R, error)")
errOutputChType = errors.New("an output channel must be of type chan<- T")
errOutputChStepType = errors.New("an output channel should only be registered when last step is of type func(context.Context, T) (R, error)")
inputChTypeMismatchFmt = "input channels type %s does not match the first steps input type %s"
outputChTypeMismatchFmt = "output channels type %s does not match the last steps output type %s"
typeMismatchFmt = "Step %d: previous steps output type %s does not match current steps input type %s"
)
// Builder is used to construct a flo(workflow).
type Builder struct {
inCh interface{}
outCh interface{}
realChan chan interface{}
steps []*stepRunner
parallelism int
errHandler func(error)
}
// Option that configures a Builder.
type Option func(*Builder)
// WithParallelism configures the default number of workers launched for each step.
func WithParallelism(i int) Option {
return func(b *Builder) {
if i < 1 {
i = 1
}
b.parallelism = i
}
}
// WithErrorHandler configures the default error handler for when a Step returns an error. This is useful should you want
// to do any logging/auditing.
func WithErrorHandler(handler ErrorHandler) Option {
return func(b *Builder) {
b.errHandler = handler
}
}
// WithInput configures the input channel that feeds the flo. This option is only valid if the first step registered in
// the flo is of type func(context.Context, T) (R, errorr). In this case the ch should be of type chan T.
func WithInput(ch interface{}) Option {
return func(b *Builder) {
b.inCh = ch
}
}
// WithOutput configures the output channel of the flo. This is useful should you want to bridge the final output of the
// flo with some of your other code. This option is only valid if the last step registered in the flo is of type
// func(context.Context, T) (R, error). In this case the ch should be of type chan R. This channel should not be closed
// until after the flo ends processing data. It will not be managed by the flo.
func WithOutput(ch interface{}) Option {
return func(b *Builder) {
b.outCh = ch
}
}
// NewBuilder creates a Builder, used to construct a flo. The flo will not begin to process any data until
// BuildAndExecute is called.
func NewBuilder(options ...Option) *Builder {
f := &Builder{
parallelism: 1,
}
for i := range options {
options[i](f)
}
return f
}
// Add registers a Step with the flo Builder.
func (b *Builder) Add(s Step, options ...StepOption) *Builder {
sr := &stepRunner{
step: s,
parallelism: b.parallelism,
wg: &sync.WaitGroup{},
errHandler: b.errHandler,
}
for i := range options {
options[i](sr)
}
b.steps = append(b.steps, sr)
return b
}
// BuildAndExecute the flo. This will validate all steps registered to the pipeline. If validation fails an error is
// returned and no data will be processed. If validation is successful the steps will begin to process data and this
// method will block until the provdied context is canceled or the input channel closed, if one was registered.
func (b *Builder) BuildAndExecute(ctx context.Context) error {
err := b.Validate()
if err != nil {
return err
}
// wire up the steps and start worker pools
for i := range b.steps {
if i == 0 {
if b.inCh != nil {
b.steps[i].registerInput(b.launchInputChannel())
}
} else {
b.steps[i].registerInput(b.steps[i-1].output())
}
// allocate output channel, if needed, to avoid data race
if b.steps[i].sType != onlyIn {
b.steps[i].output()
}
b.steps[i].start(ctx)
}
if b.outCh != nil {
b.launchOutputChannel()
}
b.awaitShutdown()
return nil
}
// Validate makes sure the pipeline can process data. It ensures all registered steps are of the right type and that
// their input and output types line up. This is the methodd that BuildAndExecute calls. It is exposed mainly for
// testing purposes so that end users of this api can find out at compile time if their pipeline is set up correctly.
func (b *Builder) Validate() error {
stepCnt := len(b.steps)
if stepCnt < 2 {
return errStepCnt
}
// loop through and validate steps
var (
input reflect.Type
output reflect.Type
prevOutput reflect.Type
st stepType
)
for i := range b.steps {
st = typeOfStep(b.steps[i].step)
// some initial validation
if st == invalid {
return errStepType
} else if i == 0 && st == onlyIn {
return errFirstStep
} else if i == stepCnt-1 && st == onlyOut {
return errLastStep
} else if 0 < i && i < stepCnt-1 && st != inOut {
return errInteriorStep
}
// set the stepRunner's type
b.steps[i].sType = st
// set variables for input/output types
switch st {
case onlyOut:
output = reflect.TypeOf(b.steps[i].step).Out(0)
case inOut:
input = reflect.TypeOf(b.steps[i].step).In(1)
output = reflect.TypeOf(b.steps[i].step).Out(0)
case onlyIn:
input = reflect.TypeOf(b.steps[i].step).In(1)
}
if i == 0 {
prevOutput = output
continue
}
// make sure types align
if input.Kind() == reflect.Interface {
if !prevOutput.Implements(input) {
return fmt.Errorf(typeMismatchFmt, i+1, prevOutput, input)
}
} else if prevOutput != input {
return fmt.Errorf(typeMismatchFmt, i+1, prevOutput, input)
}
prevOutput = output
}
// validate input channel
if b.inCh != nil {
err := validateInputChannel(b.inCh, b.steps[0])
if err != nil {
return err
}
}
// validate input channel
if b.outCh != nil {
err := validateOutputChannel(b.outCh, b.steps[len(b.steps)-1])
if err != nil {
return err
}
}
return nil
}
func (b *Builder) launchInputChannel() chan interface{} {
v := reflect.ValueOf(b.inCh)
realChan := make(chan interface{}, v.Cap())
b.realChan = realChan
go func() {
for {
x, ok := v.Recv()
if !ok {
close(realChan)
return
}
realChan <- x.Interface()
}
}()
return realChan
}
func (b *Builder) launchOutputChannel() {
v := reflect.ValueOf(b.outCh)
lastStepOutput := b.steps[len(b.steps)-1].output()
go func() {
for output := range lastStepOutput {
v.Send(reflect.ValueOf(output))
}
}()
}
func validateInputChannel(inCh interface{}, sr *stepRunner) error {
if sr.sType != inOut {
return errInputChStepType
}
// make sure it is a channel and it is readable
t := reflect.TypeOf(inCh)
if t.Kind() != reflect.Chan ||
(t.Kind() == reflect.Chan && t.ChanDir() == reflect.SendDir) {
return errInputChType
}
// make sure types align
input := reflect.TypeOf(sr.step).In(1)
t = t.Elem()
if input.Kind() == reflect.Interface {
if !t.Implements(input) {
return fmt.Errorf(inputChTypeMismatchFmt, t, input)
}
} else if t != input {
return fmt.Errorf(inputChTypeMismatchFmt, t, input)
}
return nil
}
func validateOutputChannel(outCh interface{}, sr *stepRunner) error {
if sr.sType != inOut {
return errOutputChStepType
}
// make sure it is a channel and it is writeable
t := reflect.TypeOf(outCh)
if t.Kind() != reflect.Chan ||
(t.Kind() == reflect.Chan && t.ChanDir() == reflect.RecvDir) {
return errOutputChType
}
// make sure types align
output := reflect.TypeOf(sr.step).Out(0)
t = t.Elem()
if t.Kind() == reflect.Interface {
if !output.Implements(t) {
return fmt.Errorf(outputChTypeMismatchFmt, t, output)
}
} else if t != output {
return fmt.Errorf(outputChTypeMismatchFmt, t, output)
}
return nil
}
// typeOfStep uses reflection to determine what type of function was passed in as a step.
func typeOfStep(s Step) stepType {
if s == nil {
return invalid
}
t := reflect.TypeOf(s)
if t.Kind() != reflect.Func {
return invalid
}
if t.NumIn() < 1 || t.NumIn() > 2 ||
t.NumOut() < 1 || t.NumOut() > 2 ||
t.NumIn() == 1 && t.NumOut() == 1 {
return invalid
}
if t.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
return invalid
}
if t.NumOut() == 1 && t.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return invalid
}
if t.NumOut() == 2 && t.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
return invalid
}
if t.NumIn() == 1 && t.NumOut() == 2 {
return onlyOut
}
if t.NumIn() == 2 && t.NumOut() == 1 {
return onlyIn
}
return inOut
}
func (b *Builder) awaitShutdown() {
for i := range b.steps {
b.steps[i].awaitShutdown()
}
}