package mtproto
import (
"context"
"net"
"sync"
"sync/atomic"
"time"
"github.com/go-faster/errors"
"go.uber.org/zap"
"github.com/gotd/td/bin"
"github.com/gotd/td/internal/crypto"
"github.com/gotd/td/internal/proto"
"github.com/gotd/td/internal/proto/codec"
)
const (
maxPast = time .Second * 300
maxFuture = time .Second * 30
)
var errRejected = errors .New ("message rejected" )
func checkMessageID (now time .Time , rawID int64 ) error {
id := proto .MessageID (rawID )
switch id .Type () {
case proto .MessageFromServer , proto .MessageServerResponse :
default :
return errors .Wrapf (errRejected , "unexpected type %s" , id .Type ())
}
created := id .Time ()
if created .Before (now ) && now .Sub (created ) > maxPast {
return errors .Wrap (errRejected , "created too far in past" )
}
if created .Sub (now ) > maxFuture {
return errors .Wrap (errRejected , "created too far in future" )
}
return nil
}
func (c *Conn ) decryptMessage (b *bin .Buffer ) (*crypto .EncryptedMessageData , error ) {
session := c .session ()
msg , err := c .cipher .DecryptFromBuffer (session .Key , b )
if err != nil {
return nil , errors .Wrap (err , "decrypt" )
}
if msg .SessionID != session .ID {
return nil , errors .Wrapf (errRejected , "invalid session (got %d, expected %d)" , msg .SessionID , session .ID )
}
if err := checkMessageID (c .clock .Now (), msg .MessageID ); err != nil {
return nil , errors .Wrapf (err , "bad message id %d" , msg .MessageID )
}
if !c .messageIDBuf .Consume (msg .MessageID ) {
return nil , errors .Wrapf (errRejected , "duplicate or too low message id %d" , msg .MessageID )
}
return msg , nil
}
func (c *Conn ) consumeMessage (ctx context .Context , buf *bin .Buffer ) error {
msg , err := c .decryptMessage (buf )
if errors .Is (err , errRejected ) {
c .log .Warn ("Ignoring rejected message" , zap .Error (err ))
return nil
}
if err != nil {
return errors .Wrap (err , "consume message" )
}
if err := c .handleMessage (msg .MessageID , &bin .Buffer {Buf : msg .Data ()}); err != nil {
c .log .Warn ("Error while handling message" , zap .Error (err ))
}
needAck := (msg .SeqNo & 0x01 ) != 0
if needAck {
select {
case <- ctx .Done ():
return ctx .Err ()
case c .ackSendChan <- msg .MessageID :
}
}
return nil
}
func (c *Conn ) noUpdates (err error ) bool {
var syscall *net .OpError
if errors .As (err , &syscall ) && syscall .Timeout () {
c .log .Debug ("No updates" )
return true
}
return false
}
func (c *Conn ) handleAuthKeyNotFound (ctx context .Context ) error {
if c .session ().ID == 0 {
c .log .Warn ("BUG: zero session id found" )
}
c .log .Warn ("Re-generating keys (server not found key that we provided)" )
if err := c .createAuthKey (ctx ); err != nil {
return errors .Wrap (err , "unable to create auth key" )
}
c .log .Info ("Re-created auth keys" )
return nil
}
func (c *Conn ) readLoop (ctx context .Context ) (err error ) {
log := c .log .Named ("read" )
log .Debug ("Read loop started" )
defer func () {
l := log
if err != nil {
l = log .With (zap .NamedError ("reason" , err ))
}
l .Debug ("Read loop done" )
}()
var (
lastErr atomic .Value
handlers sync .WaitGroup
)
defer handlers .Wait ()
for {
buf := &bin .Buffer {}
if err , ok := lastErr .Load ().(error ); ok && err != nil {
return errors .Wrap (err , "halting" )
}
if err := c .conn .Recv (ctx , buf ); err != nil {
select {
case <- ctx .Done ():
return ctx .Err ()
default :
if c .noUpdates (err ) {
continue
}
}
var protoErr *codec .ProtocolErr
if errors .As (err , &protoErr ) && protoErr .Code == codec .CodeAuthKeyNotFound {
if err := c .handleAuthKeyNotFound (ctx ); err != nil {
return errors .Wrap (err , "auth key not found" )
}
continue
}
select {
case <- ctx .Done ():
return errors .Wrap (ctx .Err (), "read loop" )
default :
return errors .Wrap (err , "read" )
}
}
handlers .Add (1 )
go func () {
defer handlers .Done ()
if err := c .consumeMessage (ctx , buf ); err != nil {
log .Error ("Failed to process message" , zap .Error (err ))
lastErr .Store (errors .Wrap (err , "consume" ))
}
}()
}
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .