package mtproto
import (
"context"
"io"
"sync"
"time"
"github.com/go-faster/errors"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
"github.com/gotd/td/internal/crypto"
"github.com/gotd/td/internal/exchange"
"github.com/gotd/td/internal/mtproto/salts"
"github.com/gotd/td/internal/proto"
"github.com/gotd/td/internal/rpc"
"github.com/gotd/td/internal/tdsync"
"github.com/gotd/td/internal/tmap"
"github.com/gotd/td/transport"
)
type Handler interface {
OnMessage (b *bin .Buffer ) error
OnSession (session Session ) error
}
type MessageIDSource interface {
New (t proto .MessageType ) int64
}
type MessageBuf interface {
Consume (id int64 ) bool
}
type Cipher interface {
DecryptFromBuffer (k crypto .AuthKey , buf *bin .Buffer ) (*crypto .EncryptedMessageData , error )
Encrypt (key crypto .AuthKey , data crypto .EncryptedMessageData , b *bin .Buffer ) error
}
type Dialer func (ctx context .Context ) (transport .Conn , error )
type Conn struct {
dcID int
dialer Dialer
conn transport .Conn
handler Handler
rpc *rpc .Engine
rsaPublicKeys []exchange .PublicKey
types *tmap .Map
clock clock .Clock
rand io .Reader
cipher Cipher
log *zap .Logger
messageID MessageIDSource
messageIDBuf MessageBuf
sessionMux sync .RWMutex
authKey crypto .AuthKey
salt int64
sessionID int64
salts salts .Salts
sentContentMessages int32
reqMux sync .Mutex
ackSendChan chan int64
ackBatchSize int
ackInterval time .Duration
ping map [int64 ]chan struct {}
pingMux sync .Mutex
pingTimeout time .Duration
pingInterval time .Duration
gotSession *tdsync .Ready
exchangeLock sync .RWMutex
compressThreshold int
dialTimeout time .Duration
exchangeTimeout time .Duration
saltFetchInterval time .Duration
getTimeout func (req uint32 ) time .Duration
ran atomic .Bool
}
func New (dialer Dialer , opt Options ) *Conn {
opt .setDefaults ()
conn := &Conn {
dcID : opt .DC ,
dialer : dialer ,
clock : opt .Clock ,
rand : opt .Random ,
cipher : opt .Cipher ,
log : opt .Logger ,
messageID : opt .MessageID ,
messageIDBuf : proto .NewMessageIDBuf (100 ),
ackSendChan : make (chan int64 ),
ackInterval : opt .AckInterval ,
ackBatchSize : opt .AckBatchSize ,
rsaPublicKeys : opt .PublicKeys ,
handler : opt .Handler ,
types : opt .Types ,
authKey : opt .Key ,
salt : opt .Salt ,
ping : map [int64 ]chan struct {}{},
pingTimeout : opt .PingTimeout ,
pingInterval : opt .PingInterval ,
gotSession : tdsync .NewReady (),
rpc : opt .engine ,
compressThreshold : opt .CompressThreshold ,
dialTimeout : opt .DialTimeout ,
exchangeTimeout : opt .ExchangeTimeout ,
saltFetchInterval : opt .SaltFetchInterval ,
getTimeout : opt .RequestTimeout ,
}
if conn .rpc == nil {
conn .rpc = rpc .New (conn .writeContentMessage , rpc .Options {
Logger : opt .Logger .Named ("rpc" ),
RetryInterval : opt .RetryInterval ,
MaxRetries : opt .MaxRetries ,
Clock : opt .Clock ,
DropHandler : conn .dropRPC ,
})
}
return conn
}
func (c *Conn ) handleClose (ctx context .Context ) error {
<-ctx .Done ()
c .log .Debug ("Closing" )
c .rpc .ForceClose ()
if err := c .conn .Close (); err != nil {
c .log .Debug ("Failed to cleanup connection" , zap .Error (err ))
}
return nil
}
func (c *Conn ) Run (ctx context .Context , f func (ctx context .Context ) error ) error {
if c .ran .Swap (true ) {
return errors .New ("do Run on closed connection" )
}
ctx , cancel := context .WithCancel (ctx )
defer cancel ()
c .log .Debug ("Run: start" )
defer c .log .Debug ("Run: end" )
if err := c .connect (ctx ); err != nil {
return errors .Wrap (err , "start" )
}
{
g := tdsync .NewLogGroup (ctx , c .log .Named ("group" ))
g .Go ("handleClose" , c .handleClose )
g .Go ("pingLoop" , c .pingLoop )
g .Go ("ackLoop" , c .ackLoop )
g .Go ("saltsLoop" , c .saltLoop )
g .Go ("userCallback" , f )
g .Go ("readLoop" , c .readLoop )
if err := g .Wait (); err != nil {
return errors .Wrap (err , "group" )
}
}
return nil
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .