package manager
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/gotd/log"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
"github.com/gotd/td/mtproto"
"github.com/gotd/td/pool"
"github.com/gotd/td/tdsync"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
)
type protoConn interface {
Invoke (ctx context .Context , input bin .Encoder , output bin .Decoder ) error
Run (ctx context .Context , f func (ctx context .Context ) error ) error
Ping (ctx context .Context ) error
}
type ConnMode byte
const (
ConnModeUpdates ConnMode = iota
ConnModeData
ConnModeCDN
)
type Conn struct {
mode ConnMode
dc int
proto protoConn
appID int
device DeviceConfig
setup SetupCallback
onDead func (error )
clock clock .Clock
log log .Helper
handler Handler
cfg tg .Config
cdnNeedsInit atomic .Bool
pending []mtproto .Session
ongoing int
latest time .Time
mux sync .Mutex
sessionInit *tdsync .Ready
gotConfig *tdsync .Ready
dead *tdsync .Ready
connBackoff func (ctx context .Context ) backoff .BackOff
}
func (c *Conn ) OnSession (session mtproto .Session ) error {
c .log .Info (context .Background (), "SessionInit" )
c .sessionInit .Signal ()
c .mux .Lock ()
c .pending = append (c .pending , session )
c .mux .Unlock ()
if !c .configReady () {
return nil
}
return c .flushPendingSession ()
}
func (c *Conn ) configReady () bool {
select {
case <- c .gotConfig .Ready ():
return true
default :
return false
}
}
func (c *Conn ) flushPendingSession () error {
c .mux .Lock ()
pending := append ([]mtproto .Session (nil ), c .pending ...)
cfg := c .cfg
c .pending = c .pending [:0 ]
c .mux .Unlock ()
if len (pending ) == 0 {
return nil
}
for _ , s := range pending {
if err := c .handler .OnSession (cfg , s ); err != nil {
return err
}
}
return nil
}
func (c *Conn ) trackInvoke () func () {
start := c .clock .Now ()
c .mux .Lock ()
defer c .mux .Unlock ()
c .ongoing ++
c .latest = start
return func () {
c .mux .Lock ()
defer c .mux .Unlock ()
c .ongoing --
end := c .clock .Now ()
c .latest = end
c .log .Debug (context .Background (), "Invoke" ,
log .Duration ("duration" , end .Sub (start )),
log .Int ("ongoing" , c .ongoing ),
)
}
}
func (c *Conn ) Run (ctx context .Context ) (err error ) {
defer c .dead .Signal ()
defer func () {
if err != nil && ctx .Err () == nil {
c .log .Debug (ctx , "Connection dead" , log .Error (err ))
if c .onDead != nil {
c .onDead (err )
}
}
}()
return c .proto .Run (ctx , func (ctx context .Context ) error {
err := c .init (ctx )
if err != nil {
c .dead .Signal ()
}
return err
})
}
func (c *Conn ) waitSession (ctx context .Context ) error {
select {
case <- c .gotConfig .Ready ():
return nil
case <- c .dead .Ready ():
return pool .ErrConnDead
case <- ctx .Done ():
return ctx .Err ()
}
}
func (c *Conn ) Ready () <-chan struct {} {
return c .gotConfig .Ready ()
}
func (c *Conn ) Invoke (ctx context .Context , input bin .Encoder , output bin .Decoder ) error {
defer c .trackInvoke ()()
if err := c .waitSession (ctx ); err != nil {
return errors .Wrap (err , "waitSession" )
}
if c .mode == ConnModeCDN {
err := c .invokeCDN (ctx , input , output )
return err
}
q := c .wrapRequest (noopDecoder {input })
req := c .wrapRequest (&tg .InvokeWithLayerRequest {
Layer : tg .Layer ,
Query : q ,
})
err := c .proto .Invoke (ctx , req , output )
return err
}
func (c *Conn ) invokeCDN (
ctx context .Context ,
input bin .Encoder ,
output bin .Decoder ,
) error {
if c .cdnNeedsInit .Load () {
err := c .invokeCDNWrapped (ctx , input , output )
if err == nil {
c .cdnNeedsInit .Store (false )
return nil
}
return err
}
err := c .invokeCDNRaw (ctx , input , output )
if err == nil {
return nil
}
if c .shouldCDNRetryWrapped (err ) {
c .cdnNeedsInit .Store (true )
retryErr := c .invokeCDNWrapped (ctx , input , output )
if retryErr == nil {
c .cdnNeedsInit .Store (false )
return nil
}
return retryErr
}
return err
}
func (c *Conn ) invokeCDNWrapped (ctx context .Context , input bin .Encoder , output bin .Decoder ) error {
req := &tg .InvokeWithLayerRequest {
Layer : tg .Layer ,
Query : c .cdnInitRequest (noopDecoder {input }),
}
return c .proto .Invoke (ctx , req , output )
}
func (c *Conn ) invokeCDNRaw (ctx context .Context , input bin .Encoder , output bin .Decoder ) error {
return c .proto .Invoke (ctx , input , output )
}
func (c *Conn ) shouldCDNRetryWrapped (err error ) bool {
if err == nil {
return false
}
if rpcErr , ok := tgerr .As (err ); ok {
v := rpcErr .IsOneOf (
"CONNECTION_NOT_INITED" ,
"CONNECTION_LAYER_INVALID" ,
)
return v
}
return false
}
func (c *Conn ) OnMessage (b *bin .Buffer ) error {
return c .handler .OnMessage (b )
}
type noopDecoder struct {
bin .Encoder
}
func (n noopDecoder ) Decode (b *bin .Buffer ) error {
return errors .New ("not implemented" )
}
func (c *Conn ) wrapRequest (req bin .Object ) bin .Object {
if c .mode == ConnModeData {
return &tg .InvokeWithoutUpdatesRequest {
Query : req ,
}
}
return req
}
func (c *Conn ) cdnInitRequest (query bin .Object ) bin .Object {
return &tg .InitConnectionRequest {
APIID : c .appID ,
DeviceModel : "n/a" ,
SystemVersion : "n/a" ,
AppVersion : c .device .AppVersion ,
SystemLangCode : c .device .SystemLangCode ,
LangPack : c .device .LangPack ,
LangCode : c .device .LangCode ,
Proxy : c .device .Proxy ,
Params : c .device .Params ,
Query : query ,
}
}
func (c *Conn ) init (ctx context .Context ) error {
c .log .Debug (ctx , "Initializing" )
if c .mode == ConnModeCDN {
c .cdnNeedsInit .Store (true )
c .mux .Lock ()
c .latest = c .clock .Now ()
c .cfg = tg .Config {ThisDC : c .dc }
c .mux .Unlock ()
c .gotConfig .Signal ()
err := c .flushPendingSession ()
return err
}
q := c .wrapRequest (&tg .InitConnectionRequest {
APIID : c .appID ,
DeviceModel : c .device .DeviceModel ,
SystemVersion : c .device .SystemVersion ,
AppVersion : c .device .AppVersion ,
SystemLangCode : c .device .SystemLangCode ,
LangPack : c .device .LangPack ,
LangCode : c .device .LangCode ,
Proxy : c .device .Proxy ,
Params : c .device .Params ,
Query : c .wrapRequest (&tg .HelpGetConfigRequest {}),
})
req := c .wrapRequest (&tg .InvokeWithLayerRequest {
Layer : tg .Layer ,
Query : q ,
})
var cfg tg .Config
if err := backoff .RetryNotify (func () error {
if err := c .proto .Invoke (ctx , req , &cfg ); err != nil {
if tgerr .Is (err , tgerr .ErrFloodWait ) {
return errors .Wrap (err , "flood wait" )
}
return backoff .Permanent (errors .Wrap (err , "invoke" ))
}
return nil
}, c .connBackoff (ctx ), func (err error , duration time .Duration ) {
c .log .Debug (ctx , "Retrying connection initialization" ,
log .Error (err ), log .Duration ("duration" , duration ),
)
}); err != nil {
return errors .Wrap (err , "initConnection" )
}
if c .setup != nil {
if err := c .setup (ctx , c ); err != nil {
return errors .Wrap (err , "setup" )
}
}
c .mux .Lock ()
c .latest = c .clock .Now ()
c .cfg = cfg
c .mux .Unlock ()
c .gotConfig .Signal ()
err := c .flushPendingSession ()
return err
}
func (c *Conn ) Ping (ctx context .Context ) error {
return c .proto .Ping (ctx )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .