package websocket
import (
"context"
"fmt"
"io"
"math"
"net"
"sync/atomic"
"time"
)
func NetConn (ctx context .Context , c *Conn , msgType MessageType ) net .Conn {
c .SetReadLimit (-1 )
nc := &netConn {
c : c ,
msgType : msgType ,
readMu : newMu (c ),
writeMu : newMu (c ),
}
nc .writeCtx , nc .writeCancel = context .WithCancel (ctx )
nc .readCtx , nc .readCancel = context .WithCancel (ctx )
nc .writeTimer = time .AfterFunc (math .MaxInt64 , func () {
if !nc .writeMu .tryLock () {
nc .writeCancel ()
return
}
defer nc .writeMu .unlock ()
atomic .StoreInt64 (&nc .writeExpired , 1 )
})
if !nc .writeTimer .Stop () {
<-nc .writeTimer .C
}
nc .readTimer = time .AfterFunc (math .MaxInt64 , func () {
if !nc .readMu .tryLock () {
nc .readCancel ()
return
}
defer nc .readMu .unlock ()
atomic .StoreInt64 (&nc .readExpired , 1 )
})
if !nc .readTimer .Stop () {
<-nc .readTimer .C
}
return nc
}
type netConn struct {
c *Conn
msgType MessageType
writeTimer *time .Timer
writeMu *mu
writeExpired int64
writeCtx context .Context
writeCancel context .CancelFunc
readTimer *time .Timer
readMu *mu
readExpired int64
readCtx context .Context
readCancel context .CancelFunc
readEOFed bool
reader io .Reader
}
var _ net .Conn = &netConn {}
func (nc *netConn ) Close () error {
nc .writeTimer .Stop ()
nc .writeCancel ()
nc .readTimer .Stop ()
nc .readCancel ()
return nc .c .Close (StatusNormalClosure , "" )
}
func (nc *netConn ) Write (p []byte ) (int , error ) {
nc .writeMu .forceLock ()
defer nc .writeMu .unlock ()
if atomic .LoadInt64 (&nc .writeExpired ) == 1 {
return 0 , fmt .Errorf ("failed to write: %w" , context .DeadlineExceeded )
}
err := nc .c .Write (nc .writeCtx , nc .msgType , p )
if err != nil {
return 0 , err
}
return len (p ), nil
}
func (nc *netConn ) Read (p []byte ) (int , error ) {
nc .readMu .forceLock ()
defer nc .readMu .unlock ()
for {
n , err := nc .read (p )
if err != nil {
return n , err
}
if n == 0 {
continue
}
return n , nil
}
}
func (nc *netConn ) read (p []byte ) (int , error ) {
if atomic .LoadInt64 (&nc .readExpired ) == 1 {
return 0 , fmt .Errorf ("failed to read: %w" , context .DeadlineExceeded )
}
if nc .readEOFed {
return 0 , io .EOF
}
if nc .reader == nil {
typ , r , err := nc .c .Reader (nc .readCtx )
if err != nil {
switch CloseStatus (err ) {
case StatusNormalClosure , StatusGoingAway :
nc .readEOFed = true
return 0 , io .EOF
}
return 0 , err
}
if typ != nc .msgType {
err := fmt .Errorf ("unexpected frame type read (expected %v): %v" , nc .msgType , typ )
nc .c .Close (StatusUnsupportedData , err .Error())
return 0 , err
}
nc .reader = r
}
n , err := nc .reader .Read (p )
if err == io .EOF {
nc .reader = nil
err = nil
}
return n , err
}
type websocketAddr struct {
}
func (a websocketAddr ) Network () string {
return "websocket"
}
func (a websocketAddr ) String () string {
return "websocket/unknown-addr"
}
func (nc *netConn ) SetDeadline (t time .Time ) error {
nc .SetWriteDeadline (t )
nc .SetReadDeadline (t )
return nil
}
func (nc *netConn ) SetWriteDeadline (t time .Time ) error {
atomic .StoreInt64 (&nc .writeExpired , 0 )
if t .IsZero () {
nc .writeTimer .Stop ()
} else {
dur := time .Until (t )
if dur <= 0 {
dur = 1
}
nc .writeTimer .Reset (dur )
}
return nil
}
func (nc *netConn ) SetReadDeadline (t time .Time ) error {
atomic .StoreInt64 (&nc .readExpired , 0 )
if t .IsZero () {
nc .readTimer .Stop ()
} else {
dur := time .Until (t )
if dur <= 0 {
dur = 1
}
nc .readTimer .Reset (dur )
}
return nil
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .