// Package pool contains Telegram connections pool.
package pool import ( ) // DC represents connection pool to one data center. type DC struct { id int // Connection constructor. newConn func() Conn // Wrappers for external world, like logs or PRNG. log *zap.Logger // immutable // DC context. Will be canceled by Run on exit. ctx context.Context // immutable cancel context.CancelFunc // immutable // Connections supervisor. grp *tdsync.Supervisor // Free connections. free []*poolConn // Total connections. total int64 // Connection id monotonic counter. nextConn atomic.Int64 freeReq *reqMap // DC mutex. mu sync.Mutex // Limit of connections. max int64 // immutable // Signal connection for cases when all connections are dead, but all requests waiting for // free connection in 3rd acquire case. stuck *tdsync.ResetReady closed atomic.Bool } // NewDC creates new uninitialized DC. func ( context.Context, int, func() Conn, DCOptions) *DC { , := context.WithCancel() .setDefaults() return &DC{ id: , newConn: , log: .Logger, ctx: , cancel: , grp: tdsync.NewSupervisor(), freeReq: newReqMap(), max: .MaxOpenConnections, stuck: tdsync.NewResetReady(), } } func ( *DC) ( int64) *poolConn { := &poolConn{ Conn: .newConn(), id: , dc: , deleted: atomic.NewBool(false), dead: tdsync.NewReady(), } .grp.Go(func( context.Context) ( error) { defer .dead(, ) return .Run() }) return } func ( *DC) ( *poolConn, error) { if .deleted.Swap(true) { return // Already deleted. } .mu.Lock() defer .mu.Unlock() .total-- := .total if < 0 { panic("unreachable: remaining can't be less than zero") } := -1 for , := range .free { // Search connection by pointer. if .id == .id { = } } if >= 0 { // Delete by index from slice tricks. copy(.free[:], .free[+1:]) // Delete reference to prevent resource leaking. .free[len(.free)-1] = nil .free = .free[:len(.free)-1] } .dead.Signal() .stuck.Reset() .log.Debug("Connection died", zap.Int64("remaining", ), zap.Int64("conn_id", .id), zap.Error(), ) } func ( *DC) () ( *poolConn, bool) { := len(.free) if > 0 { , .free = .free[-1], .free[:-1] return , true } return } func ( *DC) ( *poolConn) { if == nil { return } .mu.Lock() defer .mu.Unlock() if .freeReq.transfer() { .log.Debug("Transfer connection to requester", zap.Int64("conn_id", .id)) return } .log.Debug("Connection released", zap.Int64("conn_id", .id)) .free = append(.free, ) } var errDCIsClosed = errors.New("DC is closed") func ( *DC) ( context.Context) ( *poolConn, error) { // nolint:gocyclo : .mu.Lock() // 1st case: have free connections. if , := .pop(); { .mu.Unlock() select { case <-.Dead(): .dead(, nil) goto default: } .log.Debug("Re-using free connection", zap.Int64("conn_id", .id)) return , nil } // 2nd case: no free connections, but can create one. // c.max < 1 means unlimited if .max < 1 || .total < .max { .total++ .mu.Unlock() := .nextConn.Inc() .log.Debug("Creating new connection", zap.Int64("conn_id", ), ) := .createConnection() select { case <-.Done(): return nil, .Err() case <-.ctx.Done(): return nil, errors.Wrap(.ctx.Err(), "DC closed") case <-.Ready(): return , nil case <-.Dead(): .dead(, nil) goto } } // 3rd case: no free connections, can't create yet one, wait for free. , := .freeReq.request() .mu.Unlock() .log.Debug("Waiting for free connect", zap.Int64("request_id", int64())) select { case := <-: .log.Debug("Got connection for request", zap.Int64("conn_id", .id), zap.Int64("request_id", int64()), ) return , nil case <-.stuck.Ready(): .log.Debug("Some connection dead, try to create new connection, cancel waiting") .freeReq.delete() select { default: case , := <-: if && != nil { return , nil } } goto case <-.Done(): = .Err() case <-.ctx.Done(): = errors.Wrap(.ctx.Err(), "DC closed") } // Executed only if at least one of context is Done. .freeReq.delete() select { default: case , := <-: if && != nil { .release() } } return nil, } // Invoke sends MTProto request using one of pool connection. func ( *DC) ( context.Context, bin.Encoder, bin.Decoder) error { if .closed.Load() { return errDCIsClosed } for { , := .acquire() if != nil { if errors.Is(, ErrConnDead) { continue } return errors.Wrap(, "acquire connection") } .log.Debug("DC Invoke") = .Invoke(, , ) .release() if != nil { .log.Debug("DC Invoke failed", zap.Error()) return errors.Wrap(, "invoke pool") } .log.Debug("DC Invoke complete") return } } // Close waits while all ongoing requests will be done or until given context is done. // Then, closes the DC. func ( *DC) () error { if .closed.Swap(true) { return errors.New("DC already closed") } .log.Debug("Closing DC") defer .log.Debug("DC closed") .cancel() return .grp.Wait() }