//go:build !js
// +build !js

package websocket

import (
	
	
	
	
	
	
	
	

	
	
	
)

// Reader reads from the connection until there is a WebSocket
// data message to be read. It will handle ping, pong and close frames as appropriate.
//
// It returns the type of the message and an io.Reader to read it.
// The passed context will also bound the reader.
// Ensure you read to EOF otherwise the connection will hang.
//
// Call CloseRead if you do not expect any data messages from the peer.
//
// Only one Reader may be open at a time.
//
// If you need a separate timeout on the Reader call and the Read itself,
// use time.AfterFunc to cancel the context passed in.
// See https://github.com/nhooyr/websocket/issues/87#issue-451703332
// Most users should not need this.
func ( *Conn) ( context.Context) (MessageType, io.Reader, error) {
	return .reader()
}

// Read is a convenience method around Reader to read a single message
// from the connection.
func ( *Conn) ( context.Context) (MessageType, []byte, error) {
	, ,  := .Reader()
	if  != nil {
		return 0, nil, 
	}

	,  := io.ReadAll()
	return , , 
}

// CloseRead starts a goroutine to read from the connection until it is closed
// or a data message is received.
//
// Once CloseRead is called you cannot read any messages from the connection.
// The returned context will be cancelled when the connection is closed.
//
// If a data message is received, the connection will be closed with StatusPolicyViolation.
//
// Call CloseRead when you do not expect to read any more messages.
// Since it actively reads from the connection, it will ensure that ping, pong and close
// frames are responded to. This means c.Ping and c.Close will still work as expected.
func ( *Conn) ( context.Context) context.Context {
	,  := context.WithCancel()

	.wg.Add(1)
	go func() {
		defer .CloseNow()
		defer .wg.Done()
		defer ()
		, ,  := .Reader()
		if  == nil {
			.Close(StatusPolicyViolation, "unexpected data message")
		}
	}()
	return 
}

// SetReadLimit sets the max number of bytes to read for a single message.
// It applies to the Reader and Read methods.
//
// By default, the connection has a message read limit of 32768 bytes.
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
//
// Set to -1 to disable.
func ( *Conn) ( int64) {
	if  >= 0 {
		// We read one more byte than the limit in case
		// there is a fin frame that needs to be read.
		++
	}

	.msgReader.limitReader.limit.Store()
}

const defaultReadLimit = 32768

func ( *Conn) *msgReader {
	 := &msgReader{
		c:   ,
		fin: true,
	}
	.readFunc = .read

	.limitReader = newLimitReader(, .readFunc, defaultReadLimit+1)
	return 
}

func ( *msgReader) () {
	if .flateContextTakeover() {
		if .dict == nil {
			.dict = &slidingWindow{}
		}
		.dict.init(32768)
	}
	if .flateBufio == nil {
		.flateBufio = getBufioReader(.readFunc)
	}

	if .flateContextTakeover() {
		.flateReader = getFlateReader(.flateBufio, .dict.buf)
	} else {
		.flateReader = getFlateReader(.flateBufio, nil)
	}
	.limitReader.r = .flateReader
	.flateTail.Reset(deflateMessageTail)
}

func ( *msgReader) () {
	if .flateReader != nil {
		putFlateReader(.flateReader)
		.flateReader = nil
	}
}

func ( *msgReader) () {
	.c.readMu.forceLock()
	.putFlateReader()
	if .dict != nil {
		.dict.close()
		.dict = nil
	}
	if .flateBufio != nil {
		putBufioReader(.flateBufio)
	}

	if .c.client {
		putBufioReader(.c.br)
		.c.br = nil
	}
}

func ( *msgReader) () bool {
	if .c.client {
		return !.c.copts.serverNoContextTakeover
	}
	return !.c.copts.clientNoContextTakeover
}

func ( *Conn) ( header) bool {
	// If compression is disabled, rsv1 is illegal.
	if !.flate() {
		return true
	}
	// rsv1 is only allowed on data frames beginning messages.
	if .opcode != opText && .opcode != opBinary {
		return true
	}
	return false
}

func ( *Conn) ( context.Context) (header, error) {
	for {
		,  := .readFrameHeader()
		if  != nil {
			return header{}, 
		}

		if .rsv1 && .readRSV1Illegal() || .rsv2 || .rsv3 {
			 := fmt.Errorf("received header with unexpected rsv bits set: %v:%v:%v", .rsv1, .rsv2, .rsv3)
			.writeError(StatusProtocolError, )
			return header{}, 
		}

		if !.client && !.masked {
			return header{}, errors.New("received unmasked frame from client")
		}

		switch .opcode {
		case opClose, opPing, opPong:
			 = .handleControl(, )
			if  != nil {
				// Pass through CloseErrors when receiving a close frame.
				if .opcode == opClose && CloseStatus() != -1 {
					return header{}, 
				}
				return header{}, fmt.Errorf("failed to handle control frame %v: %w", .opcode, )
			}
		case opContinuation, opText, opBinary:
			return , nil
		default:
			 := fmt.Errorf("received unknown opcode %v", .opcode)
			.writeError(StatusProtocolError, )
			return header{}, 
		}
	}
}

func ( *Conn) ( context.Context) (header, error) {
	select {
	case <-.closed:
		return header{}, net.ErrClosed
	case .readTimeout <- :
	}

	,  := readFrameHeader(.br, .readHeaderBuf[:])
	if  != nil {
		select {
		case <-.closed:
			return header{}, net.ErrClosed
		case <-.Done():
			return header{}, .Err()
		default:
			.close()
			return header{}, 
		}
	}

	select {
	case <-.closed:
		return header{}, net.ErrClosed
	case .readTimeout <- context.Background():
	}

	return , nil
}

func ( *Conn) ( context.Context,  []byte) (int, error) {
	select {
	case <-.closed:
		return 0, net.ErrClosed
	case .readTimeout <- :
	}

	,  := io.ReadFull(.br, )
	if  != nil {
		select {
		case <-.closed:
			return , net.ErrClosed
		case <-.Done():
			return , .Err()
		default:
			 = fmt.Errorf("failed to read frame payload: %w", )
			.close()
			return , 
		}
	}

	select {
	case <-.closed:
		return , net.ErrClosed
	case .readTimeout <- context.Background():
	}

	return , 
}

func ( *Conn) ( context.Context,  header) ( error) {
	if .payloadLength < 0 || .payloadLength > maxControlPayload {
		 := fmt.Errorf("received control frame payload with invalid length: %d", .payloadLength)
		.writeError(StatusProtocolError, )
		return 
	}

	if !.fin {
		 := errors.New("received fragmented control frame")
		.writeError(StatusProtocolError, )
		return 
	}

	,  := context.WithTimeout(, time.Second*5)
	defer ()

	 := .readControlBuf[:.payloadLength]
	_,  = .readFramePayload(, )
	if  != nil {
		return 
	}

	if .masked {
		mask(.maskKey, )
	}

	switch .opcode {
	case opPing:
		return .writeControl(, opPong, )
	case opPong:
		.activePingsMu.Lock()
		,  := .activePings[string()]
		.activePingsMu.Unlock()
		if  {
			select {
			case  <- struct{}{}:
			default:
			}
		}
		return nil
	}

	defer func() {
		.readCloseFrameErr = 
	}()

	,  := parseClosePayload()
	if  != nil {
		 = fmt.Errorf("received invalid close payload: %w", )
		.writeError(StatusProtocolError, )
		return 
	}

	 = fmt.Errorf("received close frame: %w", )
	.setCloseErr()
	.writeClose(.Code, .Reason)
	.close()
	return 
}

func ( *Conn) ( context.Context) ( MessageType,  io.Reader,  error) {
	defer errd.Wrap(&, "failed to get reader")

	 = .readMu.lock()
	if  != nil {
		return 0, nil, 
	}
	defer .readMu.unlock()

	if !.msgReader.fin {
		 = errors.New("previous message not read to completion")
		.close(fmt.Errorf("failed to get reader: %w", ))
		return 0, nil, 
	}

	,  := .readLoop()
	if  != nil {
		return 0, nil, 
	}

	if .opcode == opContinuation {
		 := errors.New("received continuation frame without text or binary frame")
		.writeError(StatusProtocolError, )
		return 0, nil, 
	}

	.msgReader.reset(, )

	return MessageType(.opcode), .msgReader, nil
}

type msgReader struct {
	c *Conn

	ctx         context.Context
	flate       bool
	flateReader io.Reader
	flateBufio  *bufio.Reader
	flateTail   strings.Reader
	limitReader *limitReader
	dict        *slidingWindow

	fin           bool
	payloadLength int64
	maskKey       uint32

	// util.ReaderFunc(mr.Read) to avoid continuous allocations.
	readFunc util.ReaderFunc
}

func ( *msgReader) ( context.Context,  header) {
	.ctx = 
	.flate = .rsv1
	.limitReader.reset(.readFunc)

	if .flate {
		.resetFlate()
	}

	.setFrame()
}

func ( *msgReader) ( header) {
	.fin = .fin
	.payloadLength = .payloadLength
	.maskKey = .maskKey
}

func ( *msgReader) ( []byte) ( int,  error) {
	 = .c.readMu.lock(.ctx)
	if  != nil {
		return 0, fmt.Errorf("failed to read: %w", )
	}
	defer .c.readMu.unlock()

	,  = .limitReader.Read()
	if .flate && .flateContextTakeover() {
		 = [:]
		.dict.write()
	}
	if errors.Is(, io.EOF) || errors.Is(, io.ErrUnexpectedEOF) && .fin && .flate {
		.putFlateReader()
		return , io.EOF
	}
	if  != nil {
		 = fmt.Errorf("failed to read: %w", )
		.c.close()
	}
	return , 
}

func ( *msgReader) ( []byte) (int, error) {
	for {
		if .payloadLength == 0 {
			if .fin {
				if .flate {
					return .flateTail.Read()
				}
				return 0, io.EOF
			}

			,  := .c.readLoop(.ctx)
			if  != nil {
				return 0, 
			}
			if .opcode != opContinuation {
				 := errors.New("received new data message without finishing the previous message")
				.c.writeError(StatusProtocolError, )
				return 0, 
			}
			.setFrame()

			continue
		}

		if int64(len()) > .payloadLength {
			 = [:.payloadLength]
		}

		,  := .c.readFramePayload(.ctx, )
		if  != nil {
			return , 
		}

		.payloadLength -= int64()

		if !.c.client {
			.maskKey = mask(.maskKey, )
		}

		return , nil
	}
}

type limitReader struct {
	c     *Conn
	r     io.Reader
	limit xsync.Int64
	n     int64
}

func ( *Conn,  io.Reader,  int64) *limitReader {
	 := &limitReader{
		c: ,
	}
	.limit.Store()
	.reset()
	return 
}

func ( *limitReader) ( io.Reader) {
	.n = .limit.Load()
	.r = 
}

func ( *limitReader) ( []byte) (int, error) {
	if .n < 0 {
		return .r.Read()
	}

	if .n == 0 {
		 := fmt.Errorf("read limited at %v bytes", .limit.Load())
		.c.writeError(StatusMessageTooBig, )
		return 0, 
	}

	if int64(len()) > .n {
		 = [:.n]
	}
	,  := .r.Read()
	.n -= int64()
	if .n < 0 {
		.n = 0
	}
	return , 
}