package telegram
import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"go.uber.org/multierr"
"github.com/gotd/log"
"github.com/gotd/td/exchange"
"github.com/gotd/td/tdsync"
"github.com/gotd/td/telegram/auth"
"github.com/gotd/td/tgerr"
)
func (c *Client ) runUntilRestart (ctx context .Context ) error {
c .notifyConnectionState (ConnectionStateConnecting )
c .connMux .Lock ()
conn := c .conn
c .connMux .Unlock ()
g := tdsync .NewCancellableGroup (ctx )
g .Go (conn .Run )
if c .onConnectionState != nil {
if ready , ok := conn .(interface { Ready () <-chan struct {} }); ok {
g .Go (func (ctx context .Context ) error {
select {
case <- ctx .Done ():
return ctx .Err ()
case <- ready .Ready ():
c .notifyConnectionState (ConnectionStateReady )
return nil
}
})
}
}
if !c .noUpdatesMode {
g .Go (func (ctx context .Context ) error {
self , err := c .Self (ctx )
if err != nil {
if !auth .IsUnauthorized (err ) {
c .log .Warn (ctx , "Got error on self" , log .Error (err ))
}
if h := c .onSelfError ; h != nil {
if err := h (ctx , err ); err != nil {
return errors .Wrap (err , "onSelfError" )
}
}
return nil
}
if h := c .onSelfSuccess ; h != nil {
h (self )
}
c .log .Info (ctx , "Got self" , log .String ("username" , self .Username ))
return nil
})
}
g .Go (func (ctx context .Context ) error {
select {
case <- ctx .Done ():
return ctx .Err ()
case <- c .restart :
c .log .Debug (ctx , "Restart triggered" )
g .Cancel ()
return nil
}
})
return g .Wait ()
}
func (c *Client ) isPermanentError (err error ) bool {
if errors .Is (err , exchange .ErrKeyFingerprintNotFound ) {
return true
}
if tgerr .Is (err , "AUTH_KEY_UNREGISTERED" , "SESSION_EXPIRED" , "AUTH_KEY_DUPLICATED" ) {
return true
}
if auth .IsUnauthorized (err ) {
return true
}
return false
}
func (c *Client ) reconnectUntilClosed (ctx context .Context ) error {
b := tdsync .SyncBackoff (backoff .WithContext (c .newConnBackoff (), ctx ))
c .connBackoff .Store (&b )
return backoff .RetryNotify (func () error {
if err := c .runUntilRestart (ctx ); err != nil {
if c .isPermanentError (err ) {
return backoff .Permanent (err )
}
return err
}
return nil
}, b , func (err error , timeout time .Duration ) {
c .log .Info (context .Background (), "Restarting connection" , log .Error (err ), log .Duration ("backoff" , timeout ))
c .notifyConnectionState (ConnectionStateDisconnected )
c .connMux .Lock ()
c .handlePrimaryConnDead (err )
c .replaceConn (c .createPrimaryConn (nil ))
c .connMux .Unlock ()
})
}
func (c *Client ) onReady () {
c .log .Debug (context .Background (), "Ready" )
c .ready .Signal ()
if b := c .connBackoff .Load (); b != nil {
(*b ).Reset ()
}
}
func (c *Client ) resetReady () {
c .ready .Reset ()
}
func (c *Client ) Run (ctx context .Context , f func (ctx context .Context ) error ) (err error ) {
if c .ctx != nil {
select {
case <- c .ctx .Done ():
return errors .Wrap (c .ctx .Err (), "client already closed" )
default :
}
}
c .ctx , c .cancel = context .WithCancel (ctx )
c .log .Info (ctx , "Starting" )
defer c .log .Info (context .Background (), "Closed" )
defer func () {
c .subConnsMux .Lock ()
subConns := make ([]CloseInvoker , 0 , len (c .subConns ))
for _ , conn := range c .subConns {
subConns = append (subConns , conn )
}
c .subConns = map [int ]CloseInvoker {}
c .subConnsMux .Unlock ()
cdnConns := c .cdnPools .drain ()
for _ , conn := range subConns {
if closeErr := conn .Close (); !errors .Is (closeErr , context .Canceled ) {
multierr .AppendInto (&err , closeErr )
}
}
for _ , conn := range cdnConns {
if closeErr := conn .Close (); !errors .Is (closeErr , context .Canceled ) {
multierr .AppendInto (&err , closeErr )
}
}
}()
defer c .cancel ()
c .resetReady ()
if err := c .restoreConnection (ctx ); err != nil {
return err
}
g := tdsync .NewCancellableGroup (ctx )
g .Go (c .reconnectUntilClosed )
g .Go (func (ctx context .Context ) error {
select {
case <- ctx .Done ():
c .cancel ()
return ctx .Err ()
case <- c .ctx .Done ():
return c .ctx .Err ()
}
})
g .Go (func (ctx context .Context ) error {
select {
case <- ctx .Done ():
return ctx .Err ()
case <- c .ready .Ready ():
if err := f (ctx ); err != nil {
return errors .Wrap (err , "callback" )
}
c .log .Debug (ctx , "Callback returned, stopping" )
g .Cancel ()
return nil
}
})
if err := g .Wait (); !errors .Is (err , context .Canceled ) {
return err
}
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .