package manager
import (
"context"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"go.uber.org/zap"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
"github.com/gotd/td/internal/mtproto"
"github.com/gotd/td/internal/pool"
"github.com/gotd/td/internal/tdsync"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
)
type protoConn interface {
Invoke (ctx context .Context , input bin .Encoder , output bin .Decoder ) error
Run (ctx context .Context , f func (ctx context .Context ) error ) error
Ping (ctx context .Context ) error
}
type ConnMode byte
const (
ConnModeUpdates ConnMode = iota
ConnModeData
ConnModeCDN
)
type Conn struct {
mode ConnMode
proto protoConn
appID int
device DeviceConfig
setup SetupCallback
clock clock .Clock
log *zap .Logger
handler Handler
cfg tg .Config
ongoing int
latest time .Time
mux sync .Mutex
sessionInit *tdsync .Ready
gotConfig *tdsync .Ready
dead *tdsync .Ready
connBackoff func (ctx context .Context ) backoff .BackOff
}
func (c *Conn ) OnSession (session mtproto .Session ) error {
c .log .Info ("SessionInit" )
c .sessionInit .Signal ()
select {
case <- c .gotConfig .Ready ():
case <- c .dead .Ready ():
return nil
}
c .mux .Lock ()
cfg := c .cfg
c .mux .Unlock ()
return c .handler .OnSession (cfg , session )
}
func (c *Conn ) trackInvoke () func () {
start := c .clock .Now ()
c .mux .Lock ()
defer c .mux .Unlock ()
c .ongoing ++
c .latest = start
return func () {
c .mux .Lock ()
defer c .mux .Unlock ()
c .ongoing --
end := c .clock .Now ()
c .latest = end
c .log .Debug ("Invoke" ,
zap .Duration ("duration" , end .Sub (start )),
zap .Int ("ongoing" , c .ongoing ),
)
}
}
func (c *Conn ) Run (ctx context .Context ) (err error ) {
defer c .dead .Signal ()
defer func () {
if err != nil && ctx .Err () == nil {
c .log .Debug ("Connection dead" , zap .Error (err ))
}
}()
return c .proto .Run (ctx , func (ctx context .Context ) error {
err := c .init (ctx )
if err != nil {
c .dead .Signal ()
}
return err
})
}
func (c *Conn ) waitSession (ctx context .Context ) error {
select {
case <- c .sessionInit .Ready ():
return nil
case <- c .dead .Ready ():
return pool .ErrConnDead
case <- ctx .Done ():
return ctx .Err ()
}
}
func (c *Conn ) Ready () <-chan struct {} {
return c .sessionInit .Ready ()
}
func (c *Conn ) Invoke (ctx context .Context , input bin .Encoder , output bin .Decoder ) error {
defer c .trackInvoke ()()
if err := c .waitSession (ctx ); err != nil {
return errors .Wrap (err , "waitSession" )
}
return c .proto .Invoke (ctx , c .wrapRequest (noopDecoder {input }), output )
}
func (c *Conn ) OnMessage (b *bin .Buffer ) error {
return c .handler .OnMessage (b )
}
type noopDecoder struct {
bin .Encoder
}
func (n noopDecoder ) Decode (b *bin .Buffer ) error {
return errors .New ("not implemented" )
}
func (c *Conn ) wrapRequest (req bin .Object ) bin .Object {
if c .mode != ConnModeUpdates {
return &tg .InvokeWithoutUpdatesRequest {
Query : req ,
}
}
return req
}
func (c *Conn ) init (ctx context .Context ) error {
c .log .Debug ("Initializing" )
q := c .wrapRequest (&tg .InitConnectionRequest {
APIID : c .appID ,
DeviceModel : c .device .DeviceModel ,
SystemVersion : c .device .SystemVersion ,
AppVersion : c .device .AppVersion ,
SystemLangCode : c .device .SystemLangCode ,
LangPack : c .device .LangPack ,
LangCode : c .device .LangCode ,
Proxy : c .device .Proxy ,
Params : c .device .Params ,
Query : c .wrapRequest (&tg .HelpGetConfigRequest {}),
})
req := c .wrapRequest (&tg .InvokeWithLayerRequest {
Layer : tg .Layer ,
Query : q ,
})
var cfg tg .Config
if err := backoff .RetryNotify (func () error {
if err := c .proto .Invoke (ctx , req , &cfg ); err != nil {
if tgerr .Is (err , tgerr .ErrFloodWait ) {
return errors .Wrap (err , "flood wait" )
}
return backoff .Permanent (errors .Wrap (err , "invoke" ))
}
return nil
}, c .connBackoff (ctx ), func (err error , duration time .Duration ) {
c .log .Debug ("Retrying connection initialization" ,
zap .Error (err ), zap .Duration ("duration" , duration ),
)
}); err != nil {
return errors .Wrap (err , "initConnection" )
}
if c .setup != nil {
if err := c .setup (ctx , c ); err != nil {
return errors .Wrap (err , "setup" )
}
}
c .mux .Lock ()
c .latest = c .clock .Now ()
c .cfg = cfg
c .mux .Unlock ()
c .gotConfig .Signal ()
return nil
}
func (c *Conn ) Ping (ctx context .Context ) error {
return c .proto .Ping (ctx )
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .