This commit is contained in:
zutto 2018-01-11 20:15:20 +02:00
commit a13e45a324
3 changed files with 234 additions and 0 deletions

9
README.md Normal file
View file

@ -0,0 +1,9 @@
*wip
# io.pipe that times out
this is a work-in-progress.
Purpose of this is to intentionally cause r/w to close after certain time.

110
TimeoutPipe.go Normal file
View file

@ -0,0 +1,110 @@
package TimeoutPipe
import (
"errors"
"io"
"time"
)
var TimeoutErr error = errors.New("Pipe has timed out!")
//TimeoutPipe is a io.pipe that times out.
//Purpose of this is to stop goroutine r/w deadlocks by intentionally causing error
//less code to implement on the potentially-deadlocking-r/w goroutines, no need to worry about channels or timers.
type TimeoutPipe struct {
BufferSize int
Reader *io.PipeReader
Writer *io.PipeWriter
internalReader *io.PipeReader
internalWriter *io.PipeWriter
timer *time.Timer
timeout time.Duration
}
//NewTimeoutPipe retusn a new instance of TimeoutPipe interface.
func NewTimeoutPipe() *TimeoutPipe {
r, Wr := io.Pipe()
Re, w := io.Pipe()
t := TimeoutPipe{
BufferSize: 1500,
internalReader: r,
internalWriter: w,
Reader: Re,
Writer: Wr,
}
return &t
}
//Pipe returns instance of io.pipereader & io.pipewriter
func (t *TimeoutPipe) Pipe(timeout time.Duration) (*io.PipeReader, *io.PipeWriter) {
t.timeout = timeout
t.rwProxy()
t.start()
return t.Reader, t.Writer
}
func (t *TimeoutPipe) start() {
t.timer = time.AfterFunc(t.timeout, func() {
t.closePipes(TimeoutErr)
})
}
func (t *TimeoutPipe) rwProxy() {
go func() {
var buffer []byte = make([]byte, t.BufferSize)
for {
var written int = 0
readLength, err := t.internalReader.Read(buffer)
if err != nil || readLength == 0 {
switch err {
case io.EOF:
t.internalWriter.Write(buffer[:readLength])
t.closePipes(err)
return
case io.ErrClosedPipe:
t.closePipes(err)
return
default:
t.closePipes(err)
return
}
}
t.ResetTimer()
for written < readLength {
w, err := t.internalWriter.Write(buffer[written:readLength])
written += w
if err != nil {
t.closePipes(err)
}
}
}
}()
}
func (t *TimeoutPipe) closePipes(e error) {
if e != nil {
t.internalReader.Close()
t.internalWriter.Close()
t.Reader.CloseWithError(e)
t.Writer.Close()
} else {
t.internalReader.Close()
t.internalWriter.Close()
t.Reader.Close()
t.Writer.Close()
}
}
//
func (t *TimeoutPipe) ResetTimer() {
t.timer.Stop()
t.timer.Reset(t.timeout)
}

115
TimeoutPipe_test.go Normal file
View file

@ -0,0 +1,115 @@
package TimeoutPipe
import (
"sync"
"testing"
"time"
)
var dummyData []byte = []byte("testing 12345")
var smallBuffer []byte = make([]byte, 2)
var largeBuffer []byte = make([]byte, 100)
func TestNoTimeout(t *testing.T) {
x := NewTimeoutPipe()
read, write := x.Pipe(time.Second * 5)
_, e := write.Write(dummyData)
if e != nil {
t.Error("Writer died unexpectedly!")
}
r, re := read.Read(largeBuffer)
if re != nil {
t.Error("Reader died unexpectedly!")
}
if r < len(dummyData) || r > len(dummyData) {
t.Error("unexpected write or read length")
}
}
//if this function gets stuck -- something is wrong.
func TestTimeout(t *testing.T) {
x := NewTimeoutPipe()
read, _ := x.Pipe(time.Second * 1)
_, re := read.Read(largeBuffer)
if re != nil {
//this is what we want
}
}
func TestBufferedData(t *testing.T) {
wg := sync.WaitGroup{}
x := NewTimeoutPipe()
read, write := x.Pipe(time.Second * 1)
wg.Add(1)
go func() {
_, e := write.Write(dummyData)
if e != nil {
return
t.Error("Writer died unexpectedly!")
}
wg.Done()
}()
wg.Add(1)
go func() {
for {
_, re := read.Read(smallBuffer)
if re != nil {
break
}
}
wg.Done()
}()
wg.Wait()
}
func TestWriteLargeData(t *testing.T) {
//we dont actually need to write large data, just set buffer size to very low ^^
x := NewTimeoutPipe()
x.BufferSize = 2
read, write := x.Pipe(time.Second * 5)
written := 0
go func() {
for {
_, re := read.Read(largeBuffer)
if re != nil {
break
t.Error("Reader died unexpectedly!")
}
}
}()
for written < len(dummyData) {
w, e := write.Write(dummyData)
if e != nil {
t.Error("Writer died unexpectedly!")
}
written += w
}
}
func TestReset(t *testing.T) {
ti := time.Now()
x := NewTimeoutPipe()
x.Pipe(time.Second * 1)
for i := 0; i < 3; i++ {
x.ResetTimer()
time.Sleep(time.Second * 1)
}
if time.Since(ti) < 1*time.Second {
t.Error("Timer was not reset properly!")
}
}