autobuffer/autobuffer.go

77 lines
1.3 KiB
Go
Raw Normal View History

2023-07-07 13:35:00 +00:00
package autobuffer
import (
"sync"
)
type Autobuffer[T any] struct {
size int
buffer []T
action func(items []T)
inputChan chan T
flushChan chan bool
stopChan chan bool
mu *sync.Mutex
}
func New[T any](size int, action func(items []T)) *Autobuffer[T] {
x := &Autobuffer[T]{
size: size,
buffer: make([]T, 0),
action: action,
inputChan: make(chan T),
flushChan: make(chan bool),
stopChan: make(chan bool),
mu: &sync.Mutex{},
}
go func() {
run := true
for run {
select {
case <-x.stopChan:
close(x.inputChan)
close(x.flushChan)
close(x.stopChan)
if len(x.buffer) > 0 {
x.mu.Lock()
old := x.buffer
x.buffer = make([]T, 0)
x.mu.Unlock()
action(old)
}
run = false
case <-x.flushChan:
x.mu.Lock()
old := x.buffer
x.buffer = make([]T, 0)
x.mu.Unlock()
action(old)
case item := <-x.inputChan:
x.mu.Lock()
x.buffer = append(x.buffer, item)
if len(x.buffer) >= x.size {
old := x.buffer
x.buffer = make([]T, 0)
action(old)
}
x.mu.Unlock()
}
}
}()
return x
}
func (ab Autobuffer[T]) Insert(item T) {
ab.inputChan <- item
}
func (ab *Autobuffer[T]) Flush() {
ab.flushChan <- true
}
func (ab *Autobuffer[T]) Stop() {
ab.stopChan <- true
}