1. Single Flight: работа с кэшем и базой данных
Как это работает?
type Cache interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any) error
}
type Database interface {
Query(ctx context.Context, query string, args ...string) (any, error)
}
func GetUserBalance(ctx context.Context, userID string) (any, error) {
value, err := cache.Get(ctx, userID)
if err == nil {
return value, nil
}
const query = "SELECT balance FROM users WHERE user_id = ?"
value, err = database.Query(ctx, query, userID)
if err != nil {
return nil, err
}
_ = cache.Set(ctx, userID, value)
return value, err
}
Проблема больших нагрузок
Как работает Single Flight?
Как это реализовать?
type call struct {
err error
value any
done chan struct{}
}
type SingleFlight struct {
mutex sync.Mutex
calls map[string]*call
}
func NewSingleFlight() *SingleFlight {
return &SingleFlight{
calls: make(map[string]*call),
}
}
func (s *SingleFlight) Do(ctx context.Context, key string, action func(context.Context) (any, error)) (any, error) {
s.mutex.Lock()
if call, found := s.calls[key]; found {
s.mutex.Unlock()
return s.wait(ctx, call)
}
call := &call{
done: make(chan struct{}),
}
s.calls[key] = call
s.mutex.Unlock()
go func() {
defer func() {
if v := recover(); v != nil {
call.err = errors.New("error from single flight")
}
close(call.done)
s.mutex.Lock()
delete(s.calls, key)
s.mutex.Unlock()
}()
call.value, call.err = action(ctx)
}()
return s.wait(ctx, call)
}
func (s *SingleFlight) wait(ctx context.Context, call *call) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-call.done:
return call.value, call.err
}
}
type Cache interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any) error
}
type Database interface {
Query(ctx context.Context, query string, args ...string) (any, error)
}
func GetUserBalance(ctx context.Context, userID string) (any, error) {
value, err := cache.Get(ctx, userID)
if err == nil {
return value, nil
}
const query = "SELECT balance FROM users WHERE user_id = ?"
return singleFlight.Do(ctx, userID, func(ctx context.Context) (any, error) {
value, err = database.Query(ctx, query, userID)
if err != nil {
return nil, err
}
_ = cache.Set(ctx, userID, value)
return value, err
})
}
1. Медленное выполнение первой горутины
2. Устаревшие данные
2. Moving Later для работы с синхронными репликами в кластере
Реализация паттерна Moving Later
type Database interface {
Query(query string) string // simple interface for example
}
func DistributedQuery(replicas []Database, query string) string {
responseCh := make(chan string, 1)
for _, replica := range replicas {
go func() {
select {
case responseCh <- replica.Query(query):
default:
return
}
}()
}
return <-responseCh
}
func main() {
replicas := []*PgSQLDatabase{
NewPgSQLDatabase("127.0.0.1:5432"),
NewPgSQLDatabase("127.0.0.2:5432"),
NewPgSQLDatabase("127.0.0.3:5432"),
}
response := DistributedQuery(replicas, "query to pgsql...")
_ = response
}
Зачем нужен буферизированный канал?
3. Err Group для запросов в кластер с несколькими шардами