package net
import (
"io"
"os"
"sync"
"time"
)
type pipeDeadline struct {
mu sync .Mutex
timer *time .Timer
cancel chan struct {}
}
func makePipeDeadline () pipeDeadline {
return pipeDeadline {cancel : make (chan struct {})}
}
func (d *pipeDeadline ) set (t time .Time ) {
d .mu .Lock ()
defer d .mu .Unlock ()
if d .timer != nil && !d .timer .Stop () {
<-d .cancel
}
d .timer = nil
closed := isClosedChan (d .cancel )
if t .IsZero () {
if closed {
d .cancel = make (chan struct {})
}
return
}
if dur := time .Until (t ); dur > 0 {
if closed {
d .cancel = make (chan struct {})
}
d .timer = time .AfterFunc (dur , func () {
close (d .cancel )
})
return
}
if !closed {
close (d .cancel )
}
}
func (d *pipeDeadline ) wait () chan struct {} {
d .mu .Lock ()
defer d .mu .Unlock ()
return d .cancel
}
func isClosedChan (c <-chan struct {}) bool {
select {
case <- c :
return true
default :
return false
}
}
type pipeAddr struct {}
func (pipeAddr ) Network () string { return "pipe" }
func (pipeAddr ) String () string { return "pipe" }
type pipe struct {
wrMu sync .Mutex
rdRx <-chan []byte
rdTx chan <- int
wrTx chan <- []byte
wrRx <-chan int
once sync .Once
localDone chan struct {}
remoteDone <-chan struct {}
readDeadline pipeDeadline
writeDeadline pipeDeadline
}
func Pipe () (Conn , Conn ) {
cb1 := make (chan []byte )
cb2 := make (chan []byte )
cn1 := make (chan int )
cn2 := make (chan int )
done1 := make (chan struct {})
done2 := make (chan struct {})
p1 := &pipe {
rdRx : cb1 , rdTx : cn1 ,
wrTx : cb2 , wrRx : cn2 ,
localDone : done1 , remoteDone : done2 ,
readDeadline : makePipeDeadline (),
writeDeadline : makePipeDeadline (),
}
p2 := &pipe {
rdRx : cb2 , rdTx : cn2 ,
wrTx : cb1 , wrRx : cn1 ,
localDone : done2 , remoteDone : done1 ,
readDeadline : makePipeDeadline (),
writeDeadline : makePipeDeadline (),
}
return p1 , p2
}
func (*pipe ) LocalAddr () Addr { return pipeAddr {} }
func (*pipe ) RemoteAddr () Addr { return pipeAddr {} }
func (p *pipe ) Read (b []byte ) (int , error ) {
n , err := p .read (b )
if err != nil && err != io .EOF && err != io .ErrClosedPipe {
err = &OpError {Op : "read" , Net : "pipe" , Err : err }
}
return n , err
}
func (p *pipe ) read (b []byte ) (n int , err error ) {
switch {
case isClosedChan (p .localDone ):
return 0 , io .ErrClosedPipe
case isClosedChan (p .remoteDone ):
return 0 , io .EOF
case isClosedChan (p .readDeadline .wait ()):
return 0 , os .ErrDeadlineExceeded
}
select {
case bw := <- p .rdRx :
nr := copy (b , bw )
p .rdTx <- nr
return nr , nil
case <- p .localDone :
return 0 , io .ErrClosedPipe
case <- p .remoteDone :
return 0 , io .EOF
case <- p .readDeadline .wait ():
return 0 , os .ErrDeadlineExceeded
}
}
func (p *pipe ) Write (b []byte ) (int , error ) {
n , err := p .write (b )
if err != nil && err != io .ErrClosedPipe {
err = &OpError {Op : "write" , Net : "pipe" , Err : err }
}
return n , err
}
func (p *pipe ) write (b []byte ) (n int , err error ) {
switch {
case isClosedChan (p .localDone ):
return 0 , io .ErrClosedPipe
case isClosedChan (p .remoteDone ):
return 0 , io .ErrClosedPipe
case isClosedChan (p .writeDeadline .wait ()):
return 0 , os .ErrDeadlineExceeded
}
p .wrMu .Lock ()
defer p .wrMu .Unlock ()
for once := true ; once || len (b ) > 0 ; once = false {
select {
case p .wrTx <- b :
nw := <-p .wrRx
b = b [nw :]
n += nw
case <- p .localDone :
return n , io .ErrClosedPipe
case <- p .remoteDone :
return n , io .ErrClosedPipe
case <- p .writeDeadline .wait ():
return n , os .ErrDeadlineExceeded
}
}
return n , nil
}
func (p *pipe ) SetDeadline (t time .Time ) error {
if isClosedChan (p .localDone ) || isClosedChan (p .remoteDone ) {
return io .ErrClosedPipe
}
p .readDeadline .set (t )
p .writeDeadline .set (t )
return nil
}
func (p *pipe ) SetReadDeadline (t time .Time ) error {
if isClosedChan (p .localDone ) || isClosedChan (p .remoteDone ) {
return io .ErrClosedPipe
}
p .readDeadline .set (t )
return nil
}
func (p *pipe ) SetWriteDeadline (t time .Time ) error {
if isClosedChan (p .localDone ) || isClosedChan (p .remoteDone ) {
return io .ErrClosedPipe
}
p .writeDeadline .set (t )
return nil
}
func (p *pipe ) Close () error {
p .once .Do (func () { close (p .localDone ) })
return nil
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .