package autobuffer import ( "sync" ) type Autobuffer[T any] struct { size int buffer []T action func(items []T) inputChan chan T flushChan chan bool stopChan chan bool mu *sync.Mutex } func New[T any](size int, action func(items []T)) *Autobuffer[T] { x := &Autobuffer[T]{ size: size, buffer: make([]T, 0), action: action, inputChan: make(chan T), flushChan: make(chan bool), stopChan: make(chan bool), mu: &sync.Mutex{}, } go func() { run := true for run { select { case <-x.stopChan: close(x.inputChan) close(x.flushChan) close(x.stopChan) if len(x.buffer) > 0 { x.mu.Lock() old := x.buffer x.buffer = make([]T, 0) x.mu.Unlock() action(old) } run = false case <-x.flushChan: x.mu.Lock() old := x.buffer x.buffer = make([]T, 0) x.mu.Unlock() action(old) case item := <-x.inputChan: x.mu.Lock() x.buffer = append(x.buffer, item) if len(x.buffer) >= x.size { old := x.buffer x.buffer = make([]T, 0) action(old) } x.mu.Unlock() } } }() return x } func (ab Autobuffer[T]) Insert(item T) { ab.inputChan <- item } func (ab *Autobuffer[T]) Flush() { ab.flushChan <- true } func (ab *Autobuffer[T]) Stop() { ab.stopChan <- true }