From 25f168189c45c12cdc0b528d3800a15b9895f060 Mon Sep 17 00:00:00 2001 From: Nyan Date: Fri, 7 Jul 2023 16:35:00 +0300 Subject: [PATCH] initial commit --- autobuffer.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 3 ++ 2 files changed, 79 insertions(+) create mode 100644 autobuffer.go create mode 100644 go.mod diff --git a/autobuffer.go b/autobuffer.go new file mode 100644 index 0000000..1b50bf6 --- /dev/null +++ b/autobuffer.go @@ -0,0 +1,76 @@ +package autobuffer + +import ( + "sync" +) + +type Autobuffer[T any] struct { + size int + buffer []T + action func(items []T) + inputChan chan T + flushChan chan bool + stopChan chan bool + mu *sync.Mutex +} + +func New[T any](size int, action func(items []T)) *Autobuffer[T] { + x := &Autobuffer[T]{ + size: size, + buffer: make([]T, 0), + action: action, + inputChan: make(chan T), + flushChan: make(chan bool), + stopChan: make(chan bool), + mu: &sync.Mutex{}, + } + + go func() { + run := true + for run { + select { + case <-x.stopChan: + close(x.inputChan) + close(x.flushChan) + close(x.stopChan) + if len(x.buffer) > 0 { + x.mu.Lock() + old := x.buffer + x.buffer = make([]T, 0) + x.mu.Unlock() + action(old) + } + run = false + case <-x.flushChan: + x.mu.Lock() + old := x.buffer + x.buffer = make([]T, 0) + x.mu.Unlock() + action(old) + case item := <-x.inputChan: + x.mu.Lock() + x.buffer = append(x.buffer, item) + if len(x.buffer) >= x.size { + old := x.buffer + x.buffer = make([]T, 0) + action(old) + } + x.mu.Unlock() + } + } + }() + + return x +} + +func (ab Autobuffer[T]) Insert(item T) { + ab.inputChan <- item +} + +func (ab *Autobuffer[T]) Flush() { + ab.flushChan <- true +} + +func (ab *Autobuffer[T]) Stop() { + ab.stopChan <- true +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e94f703 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.nyan.su/nyan/autobuffer + +go 1.20