//go:build !js
// +build !js

package websocket

import (
	
	
	
	
	
	
	
	
	
	
)

// MessageType represents the type of a WebSocket message.
// See https://tools.ietf.org/html/rfc6455#section-5.6
type MessageType int

// MessageType constants.
const (
	// MessageText is for UTF-8 encoded text messages like JSON.
	MessageText MessageType = iota + 1
	// MessageBinary is for binary messages like protobufs.
	MessageBinary
)

// Conn represents a WebSocket connection.
// All methods may be called concurrently except for Reader and Read.
//
// You must always read from the connection. Otherwise control
// frames will not be handled. See Reader and CloseRead.
//
// Be sure to call Close on the connection when you
// are finished with it to release associated resources.
//
// On any error from any method, the connection is closed
// with an appropriate reason.
//
// This applies to context expirations as well unfortunately.
// See https://github.com/nhooyr/websocket/issues/242#issuecomment-633182220
type Conn struct {
	noCopy noCopy

	subprotocol    string
	rwc            io.ReadWriteCloser
	client         bool
	copts          *compressionOptions
	flateThreshold int
	br             *bufio.Reader
	bw             *bufio.Writer

	readTimeout  chan context.Context
	writeTimeout chan context.Context

	// Read state.
	readMu            *mu
	readHeaderBuf     [8]byte
	readControlBuf    [maxControlPayload]byte
	msgReader         *msgReader
	readCloseFrameErr error

	// Write state.
	msgWriter      *msgWriter
	writeFrameMu   *mu
	writeBuf       []byte
	writeHeaderBuf [8]byte
	writeHeader    header

	wg         sync.WaitGroup
	closed     chan struct{}
	closeMu    sync.Mutex
	closeErr   error
	wroteClose bool

	pingCounter   int32
	activePingsMu sync.Mutex
	activePings   map[string]chan<- struct{}
}

type connConfig struct {
	subprotocol    string
	rwc            io.ReadWriteCloser
	client         bool
	copts          *compressionOptions
	flateThreshold int

	br *bufio.Reader
	bw *bufio.Writer
}

func ( connConfig) *Conn {
	 := &Conn{
		subprotocol:    .subprotocol,
		rwc:            .rwc,
		client:         .client,
		copts:          .copts,
		flateThreshold: .flateThreshold,

		br: .br,
		bw: .bw,

		readTimeout:  make(chan context.Context),
		writeTimeout: make(chan context.Context),

		closed:      make(chan struct{}),
		activePings: make(map[string]chan<- struct{}),
	}

	.readMu = newMu()
	.writeFrameMu = newMu()

	.msgReader = newMsgReader()

	.msgWriter = newMsgWriter()
	if .client {
		.writeBuf = extractBufioWriterBuf(.bw, .rwc)
	}

	if .flate() && .flateThreshold == 0 {
		.flateThreshold = 128
		if !.msgWriter.flateContextTakeover() {
			.flateThreshold = 512
		}
	}

	runtime.SetFinalizer(, func( *Conn) {
		.close(errors.New("connection garbage collected"))
	})

	.wg.Add(1)
	go func() {
		defer .wg.Done()
		.timeoutLoop()
	}()

	return 
}

// Subprotocol returns the negotiated subprotocol.
// An empty string means the default protocol.
func ( *Conn) () string {
	return .subprotocol
}

func ( *Conn) ( error) {
	.closeMu.Lock()
	defer .closeMu.Unlock()

	if .isClosed() {
		return
	}
	if  == nil {
		 = .rwc.Close()
	}
	.setCloseErrLocked()

	close(.closed)
	runtime.SetFinalizer(, nil)

	// Have to close after c.closed is closed to ensure any goroutine that wakes up
	// from the connection being closed also sees that c.closed is closed and returns
	// closeErr.
	.rwc.Close()

	.wg.Add(1)
	go func() {
		defer .wg.Done()
		.msgWriter.close()
		.msgReader.close()
	}()
}

func ( *Conn) () {
	 := context.Background()
	 := context.Background()

	for {
		select {
		case <-.closed:
			return

		case  = <-.writeTimeout:
		case  = <-.readTimeout:

		case <-.Done():
			.setCloseErr(fmt.Errorf("read timed out: %w", .Err()))
			.wg.Add(1)
			go func() {
				defer .wg.Done()
				.writeError(StatusPolicyViolation, errors.New("read timed out"))
			}()
		case <-.Done():
			.close(fmt.Errorf("write timed out: %w", .Err()))
			return
		}
	}
}

func ( *Conn) () bool {
	return .copts != nil
}

// Ping sends a ping to the peer and waits for a pong.
// Use this to measure latency or ensure the peer is responsive.
// Ping must be called concurrently with Reader as it does
// not read from the connection but instead waits for a Reader call
// to read the pong.
//
// TCP Keepalives should suffice for most use cases.
func ( *Conn) ( context.Context) error {
	 := atomic.AddInt32(&.pingCounter, 1)

	 := .ping(, strconv.Itoa(int()))
	if  != nil {
		return fmt.Errorf("failed to ping: %w", )
	}
	return nil
}

func ( *Conn) ( context.Context,  string) error {
	 := make(chan struct{}, 1)

	.activePingsMu.Lock()
	.activePings[] = 
	.activePingsMu.Unlock()

	defer func() {
		.activePingsMu.Lock()
		delete(.activePings, )
		.activePingsMu.Unlock()
	}()

	 := .writeControl(, opPing, []byte())
	if  != nil {
		return 
	}

	select {
	case <-.closed:
		return net.ErrClosed
	case <-.Done():
		 := fmt.Errorf("failed to wait for pong: %w", .Err())
		.close()
		return 
	case <-:
		return nil
	}
}

type mu struct {
	c  *Conn
	ch chan struct{}
}

func ( *Conn) *mu {
	return &mu{
		c:  ,
		ch: make(chan struct{}, 1),
	}
}

func ( *mu) () {
	.ch <- struct{}{}
}

func ( *mu) () bool {
	select {
	case .ch <- struct{}{}:
		return true
	default:
		return false
	}
}

func ( *mu) ( context.Context) error {
	select {
	case <-.c.closed:
		return net.ErrClosed
	case <-.Done():
		 := fmt.Errorf("failed to acquire lock: %w", .Err())
		.c.close()
		return 
	case .ch <- struct{}{}:
		// To make sure the connection is certainly alive.
		// As it's possible the send on m.ch was selected
		// over the receive on closed.
		select {
		case <-.c.closed:
			// Make sure to release.
			.unlock()
			return net.ErrClosed
		default:
		}
		return nil
	}
}

func ( *mu) () {
	select {
	case <-.ch:
	default:
	}
}

type noCopy struct{}

func (*noCopy) () {}