package mtproto

import (
	
	
	
	
	

	
	

	
	
	
	
)

// https://core.telegram.org/mtproto/description#message-identifier-msg-id
// A message is rejected over 300 seconds after it is created or 30 seconds
// before it is created (this is needed to protect from replay attacks).
const (
	maxPast   = time.Second * 300
	maxFuture = time.Second * 30
)

// errRejected is returned on invalid message that should not be processed.
var errRejected = errors.New("message rejected")

func ( time.Time,  int64) error {
	 := proto.MessageID()

	// Check that message is from server.
	switch .Type() {
	case proto.MessageFromServer, proto.MessageServerResponse:
		// Valid.
	default:
		return errors.Wrapf(errRejected, "unexpected type %s", .Type())
	}

	 := .Time()
	if .Before() && .Sub() > maxPast {
		return errors.Wrap(errRejected, "created too far in past")
	}
	if .Sub() > maxFuture {
		return errors.Wrap(errRejected, "created too far in future")
	}

	return nil
}

func ( *Conn) ( *bin.Buffer) (*crypto.EncryptedMessageData, error) {
	 := .session()
	,  := .cipher.DecryptFromBuffer(.Key, )
	if  != nil {
		return nil, errors.Wrap(, "decrypt")
	}

	// Validating message. This protects from replay attacks.
	if .SessionID != .ID {
		return nil, errors.Wrapf(errRejected, "invalid session (got %d, expected %d)", .SessionID, .ID)
	}
	if  := checkMessageID(.clock.Now(), .MessageID);  != nil {
		return nil, errors.Wrapf(, "bad message id %d", .MessageID)
	}
	if !.messageIDBuf.Consume(.MessageID) {
		return nil, errors.Wrapf(errRejected, "duplicate or too low message id %d", .MessageID)
	}

	return , nil
}

func ( *Conn) ( context.Context,  *bin.Buffer) error {
	,  := .decryptMessage()
	if errors.Is(, errRejected) {
		.log.Warn("Ignoring rejected message", zap.Error())
		return nil
	}
	if  != nil {
		return errors.Wrap(, "consume message")
	}

	if  := .handleMessage(.MessageID, &bin.Buffer{Buf: .Data()});  != nil {
		// Probably we can return here, but this will shutdown whole
		// connection which can be unexpected.
		.log.Warn("Error while handling message", zap.Error())
		// Sending acknowledge even on error. Client should restore
		// from missing updates via explicit pts check and getDiff call.
	}

	 := (.SeqNo & 0x01) != 0
	if  {
		select {
		case <-.Done():
			return .Err()
		case .ackSendChan <- .MessageID:
		}
	}

	return nil
}

func ( *Conn) ( error) bool {
	// Checking for read timeout.
	var  *net.OpError
	if errors.As(, &) && .Timeout() {
		// We call SetReadDeadline so such error is expected.
		.log.Debug("No updates")
		return true
	}
	return false
}

func ( *Conn) ( context.Context) error {
	if .session().ID == 0 {
		// The 404 error can also be caused by zero session id.
		// See https://github.com/gotd/td/issues/107
		//
		// We should recover from this in createAuthKey, but in general
		// this code branch should be unreachable.
		.log.Warn("BUG: zero session id found")
	}
	.log.Warn("Re-generating keys (server not found key that we provided)")
	if  := .createAuthKey();  != nil {
		return errors.Wrap(, "unable to create auth key")
	}
	.log.Info("Re-created auth keys")
	// Request will be retried by ack loop.
	// Probably we can speed-up this.
	return nil
}

func ( *Conn) ( context.Context) ( error) {
	 := .log.Named("read")
	.Debug("Read loop started")
	defer func() {
		 := 
		if  != nil {
			 = .With(zap.NamedError("reason", ))
		}
		.Debug("Read loop done")
	}()

	var (
		// Last error encountered by consumeMessage.
		 atomic.Value
		// To wait all spawned goroutines
		 sync.WaitGroup
	)
	defer .Wait()

	for {
		// We've tried multiple ways to reduce allocations via reusing buffer,
		// but naive implementation induces high idle memory waste.
		//
		// Proper optimization will probably require total rework of bin.Buffer
		// with sharded (by payload size?) pool that can be used after message
		// size read (after readLen).
		//
		// Such optimization can introduce additional complexity overhead and
		// is probably not worth it.
		 := &bin.Buffer{}

		// Halting if consumeMessage encountered error.
		// Should be something critical with crypto.
		if ,  := .Load().(error);  &&  != nil {
			return errors.Wrap(, "halting")
		}

		if  := .conn.Recv(, );  != nil {
			select {
			case <-.Done():
				return .Err()
			default:
				if .noUpdates() {
					continue
				}
			}

			var  *codec.ProtocolErr
			if errors.As(, &) && .Code == codec.CodeAuthKeyNotFound {
				if  := .handleAuthKeyNotFound();  != nil {
					return errors.Wrap(, "auth key not found")
				}

				continue
			}

			select {
			case <-.Done():
				return errors.Wrap(.Err(), "read loop")
			default:
				return errors.Wrap(, "read")
			}
		}

		.Add(1)
		go func() {
			defer .Done()

			// Spawning goroutine per incoming message to utilize as much
			// resources as possible while keeping idle utilization low.
			//
			// The "worker" model was replaced by this due to idle utilization
			// overhead, especially on multi-CPU systems with multiple running
			// clients.
			if  := .consumeMessage(, );  != nil {
				.Error("Failed to process message", zap.Error())
				.Store(errors.Wrap(, "consume"))
			}
		}()
	}
}