package mtprotoimport ()// https://core.telegram.org/mtproto/description#message-identifier-msg-id// A message is rejected over 300 seconds after it is created or 30 seconds// before it is created (this is needed to protect from replay attacks).const (maxPast = time.Second * 300maxFuture = time.Second * 30)// errRejected is returned on invalid message that should not be processed.varerrRejected = errors.New("message rejected")func ( time.Time, int64) error { := proto.MessageID()// Check that message is from server.switch .Type() {caseproto.MessageFromServer, proto.MessageServerResponse:// Valid.default:returnerrors.Wrapf(errRejected, "unexpected type %s", .Type()) } := .Time()if .Before() && .Sub() > maxPast {returnerrors.Wrap(errRejected, "created too far in past") }if .Sub() > maxFuture {returnerrors.Wrap(errRejected, "created too far in future") }returnnil}func ( *Conn) ( *bin.Buffer) (*crypto.EncryptedMessageData, error) { := .session() , := .cipher.DecryptFromBuffer(.Key, )if != nil {returnnil, errors.Wrap(, "decrypt") }// Validating message. This protects from replay attacks.if .SessionID != .ID {returnnil, errors.Wrapf(errRejected, "invalid session (got %d, expected %d)", .SessionID, .ID) }if := checkMessageID(.clock.Now(), .MessageID); != nil {returnnil, errors.Wrapf(, "bad message id %d", .MessageID) }if !.messageIDBuf.Consume(.MessageID) {returnnil, errors.Wrapf(errRejected, "duplicate or too low message id %d", .MessageID) }return , nil}func ( *Conn) ( context.Context, *bin.Buffer) error { , := .decryptMessage()iferrors.Is(, errRejected) { .log.Warn(, "Ignoring rejected message", log.Error())returnnil }if != nil {returnerrors.Wrap(, "consume message") }if := .handleMessage(.MessageID, &bin.Buffer{Buf: .Data()}); != nil {// Probably we can return here, but this will shutdown whole // connection which can be unexpected. .log.Warn(, "Error while handling message", log.Error())// Sending acknowledge even on error. Client should restore // from missing updates via explicit pts check and getDiff call. } := (.SeqNo & 0x01) != 0if {select {case<-.Done():return .Err()case .ackSendChan<- .MessageID: } }returnnil}func ( *Conn) ( error) bool {// Checking for read timeout.var *net.OpErroriferrors.As(, &) && .Timeout() {// We call SetReadDeadline so such error is expected. .log.Debug(context.Background(), "No updates")returntrue }returnfalse}func ( *Conn) ( context.Context) error {if .session().ID == 0 {// The 404 error can also be caused by zero session id. // See https://github.com/gotd/td/issues/107 // // We should recover from this in createAuthKey, but in general // this code branch should be unreachable. .log.Warn(, "BUG: zero session id found") }if .pfs {// In PFS mode 404 most likely means lost temporary key, so caller should // recreate transport and re-bind, not regenerate permanent key in-place.returnerrors.Wrap(ErrPFSReconnectRequired, "temporary auth key not found in pfs mode") } .log.Warn(, "Re-generating keys (server not found key that we provided)")if := .createAuthKey(); != nil {returnerrors.Wrap(, "unable to create auth key") } .log.Info(, "Re-created auth keys")// Request will be retried by ack loop. // Probably we can speed-up this.returnnil}func ( *Conn) ( context.Context) ( error) { := .log.Named("read") .Debug(, "Read loop started")deferfunc() { := if != nil { = .With(log.NamedError("reason", )) } .Debug(, "Read loop done") }()var (// Last error encountered by consumeMessage.atomic.Value// To wait all spawned goroutinessync.WaitGroup )defer .Wait()for {// We've tried multiple ways to reduce allocations via reusing buffer, // but naive implementation induces high idle memory waste. // // Proper optimization will probably require total rework of bin.Buffer // with sharded (by payload size?) pool that can be used after message // size read (after readLen). // // Such optimization can introduce additional complexity overhead and // is probably not worth it. := &bin.Buffer{}// Halting if consumeMessage encountered error. // Should be something critical with crypto.if , := .Load().(error); && != nil {returnerrors.Wrap(, "halting") }if := .conn.Recv(, ); != nil {select {case<-.Done():return .Err()default:if .noUpdates() {continue } }var *codec.ProtocolErriferrors.As(, &) && .Code == codec.CodeAuthKeyNotFound {if := .handleAuthKeyNotFound(); != nil {returnerrors.Wrap(, "auth key not found") }continue }select {case<-.Done():returnerrors.Wrap(.Err(), "read loop")default:returnerrors.Wrap(, "read") } } .Add(1)gofunc() {defer .Done()// Spawning goroutine per incoming message to utilize as much // resources as possible while keeping idle utilization low. // // The "worker" model was replaced by this due to idle utilization // overhead, especially on multi-CPU systems with multiple running // clients.if := .consumeMessage(, ); != nil { .Error(, "Failed to process message", log.Error()) .Store(errors.Wrap(, "consume")) } }() }}
The pages are generated with Goldsv0.8.4. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.