initial commit
This commit is contained in:
commit
25f168189c
76
autobuffer.go
Normal file
76
autobuffer.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package autobuffer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Autobuffer[T any] struct {
|
||||
size int
|
||||
buffer []T
|
||||
action func(items []T)
|
||||
inputChan chan T
|
||||
flushChan chan bool
|
||||
stopChan chan bool
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func New[T any](size int, action func(items []T)) *Autobuffer[T] {
|
||||
x := &Autobuffer[T]{
|
||||
size: size,
|
||||
buffer: make([]T, 0),
|
||||
action: action,
|
||||
inputChan: make(chan T),
|
||||
flushChan: make(chan bool),
|
||||
stopChan: make(chan bool),
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
|
||||
go func() {
|
||||
run := true
|
||||
for run {
|
||||
select {
|
||||
case <-x.stopChan:
|
||||
close(x.inputChan)
|
||||
close(x.flushChan)
|
||||
close(x.stopChan)
|
||||
if len(x.buffer) > 0 {
|
||||
x.mu.Lock()
|
||||
old := x.buffer
|
||||
x.buffer = make([]T, 0)
|
||||
x.mu.Unlock()
|
||||
action(old)
|
||||
}
|
||||
run = false
|
||||
case <-x.flushChan:
|
||||
x.mu.Lock()
|
||||
old := x.buffer
|
||||
x.buffer = make([]T, 0)
|
||||
x.mu.Unlock()
|
||||
action(old)
|
||||
case item := <-x.inputChan:
|
||||
x.mu.Lock()
|
||||
x.buffer = append(x.buffer, item)
|
||||
if len(x.buffer) >= x.size {
|
||||
old := x.buffer
|
||||
x.buffer = make([]T, 0)
|
||||
action(old)
|
||||
}
|
||||
x.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return x
|
||||
}
|
||||
|
||||
func (ab Autobuffer[T]) Insert(item T) {
|
||||
ab.inputChan <- item
|
||||
}
|
||||
|
||||
func (ab *Autobuffer[T]) Flush() {
|
||||
ab.flushChan <- true
|
||||
}
|
||||
|
||||
func (ab *Autobuffer[T]) Stop() {
|
||||
ab.stopChan <- true
|
||||
}
|
Loading…
Reference in New Issue
Block a user