// Package pool contains Telegram connections pool.
package pool import ( ) // DC represents connection pool to one data center. type DC struct { id int // Connection constructor. newConn func() Conn // Wrappers for external world, like logs or PRNG. log log.Helper // immutable // DC context. Will be canceled by Run on exit. ctx context.Context // immutable cancel context.CancelFunc // immutable // Connections supervisor. grp *tdsync.Supervisor // Free connections. free []*poolConn // Total connections. total int64 // Connection id monotonic counter. nextConn atomic.Int64 freeReq *reqMap // DC mutex. mu sync.Mutex // Limit of connections. max int64 // immutable // Signal connection for cases when all connections are dead, but all requests waiting for // free connection in 3rd acquire case. stuck *tdsync.ResetReady closed atomic.Bool } // NewDC creates new uninitialized DC. func ( context.Context, int, func() Conn, DCOptions) *DC { , := context.WithCancel() .setDefaults() return &DC{ id: , newConn: , log: log.For(.Logger), ctx: , cancel: , grp: tdsync.NewSupervisor(), freeReq: newReqMap(), max: .MaxOpenConnections, stuck: tdsync.NewResetReady(), } } func ( *DC) ( int64) *poolConn { := &poolConn{ Conn: .newConn(), id: , dc: , deleted: atomic.NewBool(false), dead: tdsync.NewReady(), } .grp.Go(func( context.Context) ( error) { defer .dead(, ) return .Run() }) return } func ( *DC) ( *poolConn, error) { := .ctx if .deleted.Swap(true) { return // Already deleted. } .mu.Lock() defer .mu.Unlock() .total-- := .total if < 0 { panic("unreachable: remaining can't be less than zero") } := -1 for , := range .free { // Search connection by pointer. if .id == .id { = } } if >= 0 { // Delete by index from slice tricks. copy(.free[:], .free[+1:]) // Delete reference to prevent resource leaking. .free[len(.free)-1] = nil .free = .free[:len(.free)-1] } .dead.Signal() .stuck.Reset() .log.Debug(, "Connection died", log.Int64("remaining", ), log.Int64("conn_id", .id), log.Error(), ) } func ( *DC) () ( *poolConn, bool) { := len(.free) if > 0 { , .free = .free[-1], .free[:-1] return , true } return } func ( *DC) ( *poolConn) { if == nil { return } := .ctx .mu.Lock() defer .mu.Unlock() if .freeReq.transfer() { .log.Debug(, "Transfer connection to requester", log.Int64("conn_id", .id)) return } .log.Debug(, "Connection released", log.Int64("conn_id", .id)) .free = append(.free, ) } var errDCIsClosed = errors.New("DC is closed") func ( *DC) ( context.Context) ( *poolConn, error) { // nolint:gocyclo : .mu.Lock() // 1st case: have free connections. if , := .pop(); { .mu.Unlock() select { case <-.Dead(): .dead(, nil) goto default: } .log.Debug(, "Re-using free connection", log.Int64("conn_id", .id)) return , nil } // 2nd case: no free connections, but can create one. // c.max < 1 means unlimited if .max < 1 || .total < .max { .total++ .mu.Unlock() := .nextConn.Inc() .log.Debug(, "Creating new connection", log.Int64("conn_id", ), ) := .createConnection() select { case <-.Done(): return nil, .Err() case <-.ctx.Done(): return nil, errors.Wrap(.ctx.Err(), "DC closed") case <-.Ready(): return , nil case <-.Dead(): .dead(, nil) goto } } // 3rd case: no free connections, can't create yet one, wait for free. , := .freeReq.request() .mu.Unlock() .log.Debug(, "Waiting for free connect", log.Int64("request_id", int64())) select { case := <-: .log.Debug(, "Got connection for request", log.Int64("conn_id", .id), log.Int64("request_id", int64()), ) return , nil case <-.stuck.Ready(): .log.Debug(, "Some connection dead, try to create new connection, cancel waiting") .freeReq.delete() select { default: case , := <-: if && != nil { return , nil } } goto case <-.Done(): = .Err() case <-.ctx.Done(): = errors.Wrap(.ctx.Err(), "DC closed") } // Executed only if at least one of context is Done. .freeReq.delete() select { default: case , := <-: if && != nil { .release() } } return nil, } // Invoke sends MTProto request using one of pool connection. func ( *DC) ( context.Context, bin.Encoder, bin.Decoder) error { if .closed.Load() { return errDCIsClosed } for { , := .acquire() if != nil { if errors.Is(, ErrConnDead) { continue } return errors.Wrap(, "acquire connection") } .log.Debug(, "DC Invoke") = .Invoke(, , ) if != nil && errRetryableOnNewConn() && .Err() == nil { // Connection died before the request was processed by the // server, so it is safe to retry it on another connection. // // Mark connection as dead instead of releasing, so it is not // re-acquired and waiters are unblocked to create a new one. .dead(, ) .log.Debug(, "DC Invoke failed on dead connection, retrying", log.Int64("conn_id", .id), log.Error(), ) continue } .release() if != nil { .log.Debug(, "DC Invoke failed", log.Error()) return errors.Wrap(, "invoke pool") } .log.Debug(, "DC Invoke complete") return } } // Close waits while all ongoing requests will be done or until given context is done. // Then, closes the DC. func ( *DC) () error { if .closed.Swap(true) { return errors.New("DC already closed") } := .ctx .log.Debug(, "Closing DC") defer .log.Debug(, "DC closed") .cancel() return .grp.Wait() }