package telegram
import (
"context"
"sync/atomic"
"github.com/go-faster/errors"
"github.com/gotd/log"
"github.com/gotd/td/mtproto"
"github.com/gotd/td/pool"
"github.com/gotd/td/telegram/auth"
"github.com/gotd/td/telegram/dcs"
"github.com/gotd/td/telegram/internal/manager"
"github.com/gotd/td/tg"
"github.com/gotd/td/transport"
)
type CloseInvoker interface {
tg .Invoker
Close () error
}
func (c *Client ) createPool (dc int , max int64 , creator func () pool .Conn ) (*pool .DC , error ) {
select {
case <- c .ctx .Done ():
return nil , errors .Wrap (c .ctx .Err (), "client already closed" )
default :
}
p := pool .NewDC (c .ctx , dc , creator , pool .DCOptions {
Logger : c .log .Named ("pool" ).With (log .Int ("dc_id" , dc )).Logger (),
MaxOpenConnections : max ,
})
return p , nil
}
func (c *Client ) Pool (max int64 ) (CloseInvoker , error ) {
if max < 0 {
return nil , errors .Errorf ("invalid max value %d" , max )
}
s := c .session .Load ()
return c .createPool (s .DC , max , func () pool .Conn {
id := c .connsCounter .Inc ()
return c .createConn (id , manager .ConnModeData , nil , func (err error ) {
c .handlePrimaryConnDead (err )
if c .onDead != nil {
c .onDead (err )
}
})
})
}
func (c *Client ) dc (
ctx context .Context ,
dcID int ,
max int64 ,
dialer mtproto .Dialer ,
mode manager .ConnMode ,
) (*pool .DC , error ) {
if max < 0 {
return nil , errors .Errorf ("invalid max value %d" , max )
}
dcList := dcs .FindDCs (c .cfg .Load ().DCOptions , dcID , false )
if len (dcList ) < 1 {
return nil , errors .Errorf ("unknown DC %d" , dcID )
}
c .log .Debug (ctx , "Creating pool" ,
log .Int ("dc_id" , dcID ),
log .Int64 ("max" , max ),
log .Int ("candidates" , len (dcList )),
)
opts := c .opts
if mode == manager .ConnModeCDN {
opts .EnablePFS = false
cdnKeys , set := c .cachedCDNKeysForDC (dcID )
if !set || len (cdnKeys ) == 0 {
fetched , err := c .fetchCDNKeysForDC (ctx , dcID )
if err != nil {
return nil , errors .Wrapf (err , "fetch CDN public keys for DC %d" , dcID )
}
cdnKeys = fetched
}
if len (cdnKeys ) == 0 {
return nil , errors .Errorf ("no CDN public keys available for CDN DC %d" , dcID )
}
opts .PublicKeys = mergePublicKeys (cdnKeys , opts .PublicKeys )
}
var suppressSetup atomic .Bool
p , err := c .createPool (dcID , max , func () pool .Conn {
id := c .connsCounter .Inc ()
c .sessionsMux .Lock ()
sessions := c .sessions
if mode == manager .ConnModeCDN {
sessions = c .cdnSessions
}
session , ok := sessions [dcID ]
if !ok {
session = pool .NewSyncSession (pool .Session {DC : dcID })
sessions [dcID ] = session
}
c .sessionsMux .Unlock ()
options , data := session .Options (opts )
setup := manager .SetupCallback (nil )
handler := c .asHandler ()
if mode != manager .ConnModeCDN &&
data .AuthKey .Zero () &&
c .session .Load ().DC != dcID &&
!suppressSetup .Load () {
setup = c .dcTransferSetup (dcID )
}
if mode == manager .ConnModeCDN {
handler = c .asCDNHandler ()
}
options .Logger = c .log .Named ("conn" ).With (
log .Int64 ("conn_id" , id ),
log .Int ("dc_id" , dcID ),
).Logger ()
return c .create (
dialer , mode , c .appID ,
options , manager .ConnOptions {
DC : dcID ,
Device : c .device ,
Handler : handler ,
Setup : setup ,
OnDead : func (err error ) {
if mode == manager .ConnModeCDN {
c .handleCDNConnDead (dcID , err )
return
}
c .handleDCConnDead (dcID , err )
},
},
)
})
if err != nil {
return nil , errors .Wrap (err , "create pool" )
}
if mode == manager .ConnModeCDN {
return p , nil
}
suppressSetup .Store (true )
_, err = c .transfer (ctx , tg .NewClient (p ), dcID )
suppressSetup .Store (false )
if err != nil {
if auth .IsUnauthorized (err ) {
return p , nil
}
_ = p .Close ()
return nil , errors .Wrap (err , "transfer" )
}
return p , nil
}
func (c *Client ) DC (ctx context .Context , dc int , max int64 ) (CloseInvoker , error ) {
return c .dc (ctx , dc , max , c .primaryDC (dc ), manager .ConnModeData )
}
func (c *Client ) MediaOnly (ctx context .Context , dc int , max int64 ) (CloseInvoker , error ) {
return c .dc (ctx , dc , max , func (ctx context .Context ) (transport .Conn , error ) {
return c .resolver .MediaOnly (ctx , dc , c .dcList ())
}, manager .ConnModeData )
}
func (c *Client ) CDN (ctx context .Context , dc int , max int64 ) (CloseInvoker , error ) {
if max < 0 {
return nil , errors .Errorf ("invalid max value %d" , max )
}
need := normalizeCDNPoolMax (max )
if cached , ok := c .cdnPools .acquire (dc , need ); ok {
return cached , nil
}
created , err := c .dc (ctx , dc , need , func (ctx context .Context ) (transport .Conn , error ) {
return c .resolver .CDN (ctx , dc , c .dcList ())
}, manager .ConnModeCDN )
if err != nil {
return nil , err
}
handle , reused := c .cdnPools .publishOrAcquire (dc , need , created )
if reused {
_ = created .Close ()
return handle , nil
}
return handle , nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .