package rpc
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/go-faster/errors"
"go.uber.org/zap"
"github.com/gotd/td/bin"
"github.com/gotd/td/clock"
)
type Engine struct {
send Send
drop DropHandler
mux sync .Mutex
rpc map [int64 ]func (*bin .Buffer , error ) error
ack map [int64 ]chan struct {}
clock clock .Clock
log *zap .Logger
retryInterval time .Duration
maxRetries int
reqCtx context .Context
reqCancel context .CancelFunc
wg sync .WaitGroup
closed uint32
}
func New (send Send , cfg Options ) *Engine {
cfg .setDefaults ()
cfg .Logger .Info ("Initialized" ,
zap .Duration ("retry_interval" , cfg .RetryInterval ),
zap .Int ("max_retries" , cfg .MaxRetries ),
)
reqCtx , reqCancel := context .WithCancel (context .Background ())
return &Engine {
rpc : map [int64 ]func (*bin .Buffer , error ) error {},
ack : map [int64 ]chan struct {}{},
send : send ,
drop : cfg .DropHandler ,
log : cfg .Logger ,
maxRetries : cfg .MaxRetries ,
retryInterval : cfg .RetryInterval ,
clock : cfg .Clock ,
reqCtx : reqCtx ,
reqCancel : reqCancel ,
}
}
type Request struct {
MsgID int64
SeqNo int32
Input bin .Encoder
Output bin .Decoder
}
func (e *Engine ) Do (ctx context .Context , req Request ) error {
if e .isClosed () {
return ErrEngineClosed
}
e .wg .Add (1 )
defer e .wg .Done ()
retryCtx , retryClose := context .WithCancel (ctx )
defer retryClose ()
log := e .log .With (zap .Int64 ("msg_id" , req .MsgID ))
log .Debug ("Do called" )
done := make (chan struct {})
var (
resultErr error
handlerCalled uint32
)
handler := func (rpcBuff *bin .Buffer , rpcErr error ) error {
log .Debug ("Handler called" )
if ok := atomic .CompareAndSwapUint32 (&handlerCalled , 0 , 1 ); !ok {
log .Warn ("Handler already called" )
return errors .New ("handler already called" )
}
defer retryClose ()
defer close (done )
if rpcErr != nil {
resultErr = rpcErr
return nil
}
resultErr = req .Output .Decode (rpcBuff )
return resultErr
}
e .mux .Lock ()
e .rpc [req .MsgID ] = handler
e .mux .Unlock ()
defer func () {
e .mux .Lock ()
delete (e .rpc , req .MsgID )
e .mux .Unlock ()
}()
sent , err := e .retryUntilAck (retryCtx , req )
if err != nil && !errors .Is (err , retryCtx .Err ()) {
return errors .Wrap (err , "retryUntilAck" )
}
select {
case <- ctx .Done ():
if !sent {
return ctx .Err ()
}
e .mux .Lock ()
e .rpc [req .MsgID ] = func (b *bin .Buffer , e error ) error { return nil }
e .mux .Unlock ()
if err := e .drop (req ); err != nil {
log .Info ("Failed to drop request" , zap .Error (err ))
return ctx .Err ()
}
log .Debug ("Request dropped" )
return ctx .Err ()
case <- e .reqCtx .Done ():
return errors .Wrap (e .reqCtx .Err (), "engine forcibly closed" )
case <- done :
return resultErr
}
}
func (e *Engine ) retryUntilAck (ctx context .Context , req Request ) (sent bool , err error ) {
ctx , cancel := context .WithCancel (ctx )
defer cancel ()
var (
ackChan = e .waitAck (req .MsgID )
retries = 0
log = e .log .Named ("retry" ).With (zap .Int64 ("msg_id" , req .MsgID ))
)
defer e .removeAck (req .MsgID )
if err := e .send (ctx , req .MsgID , req .SeqNo , req .Input ); err != nil {
return false , errors .Wrap (err , "send" )
}
loop := func () error {
timer := e .clock .Timer (e .retryInterval )
defer clock .StopTimer (timer )
for {
select {
case <- ctx .Done ():
return ctx .Err ()
case <- e .reqCtx .Done ():
return errors .Wrap (e .reqCtx .Err (), "engine forcibly closed" )
case <- ackChan :
log .Debug ("Acknowledged" )
return nil
case <- timer .C ():
timer .Reset (e .retryInterval )
log .Debug ("Acknowledge timed out, performing retry" )
if err := e .send (ctx , req .MsgID , req .SeqNo , req .Input ); err != nil {
if errors .Is (err , context .Canceled ) {
return nil
}
log .Error ("Retry failed" , zap .Error (err ))
return err
}
retries ++
if retries >= e .maxRetries {
log .Error ("Retry limit reached" , zap .Int64 ("msg_id" , req .MsgID ))
return &RetryLimitReachedErr {
Retries : retries ,
}
}
}
}
}
return true , loop ()
}
func (e *Engine ) NotifyResult (msgID int64 , b *bin .Buffer ) error {
e .mux .Lock ()
fn , ok := e .rpc [msgID ]
e .mux .Unlock ()
if !ok {
e .log .Warn ("rpc callback not set" , zap .Int64 ("msg_id" , msgID ))
return nil
}
return fn (b , nil )
}
func (e *Engine ) NotifyError (msgID int64 , rpcErr error ) {
e .mux .Lock ()
fn , ok := e .rpc [msgID ]
e .mux .Unlock ()
if !ok {
e .log .Warn ("rpc callback not set" , zap .Int64 ("msg_id" , msgID ))
return
}
_ = fn (nil , rpcErr )
}
func (e *Engine ) isClosed () bool {
return atomic .LoadUint32 (&e .closed ) == 1
}
func (e *Engine ) Close () {
atomic .StoreUint32 (&e .closed , 1 )
e .log .Info ("Close called" )
e .wg .Wait ()
}
func (e *Engine ) ForceClose () {
e .reqCancel ()
e .Close ()
}
The pages are generated with Golds v0.6.7 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds .