1. Single Flight: работа с кэшем и базой данных
Как это работает?
type Cache interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any) error
}
type Database interface {
Query(ctx context.Context, query string, args ...string) (any, error)
}
func GetUserBalance(ctx context.Context, userID string) (any, error) {
value, err := cache.Get(ctx, userID)
if err == nil {
return value, nil
}
const query = "SELECT balance FROM users WHERE user_id = ?"
value, err = database.Query(ctx, query, userID)
if err != nil {
return nil, err
}
_ = cache.Set(ctx, userID, value)
return value, err
}
Проблема больших нагрузок
Как работает Single Flight?
Как это реализовать?
type call struct {
err error
value any
done chan struct{}
}
type SingleFlight struct {
mutex sync.Mutex
calls map[string]*call
}
func NewSingleFlight() *SingleFlight {
return &SingleFlight{
calls: make(map[string]*call),
}
}
func (s *SingleFlight) Do(ctx context.Context, key string, action func(context.Context) (any, error)) (any, error) {
s.mutex.Lock()
if call, found := s.calls[key]; found {
s.mutex.Unlock()
return s.wait(ctx, call)
}
call := &call{
done: make(chan struct{}),
}
s.calls[key] = call
s.mutex.Unlock()
go func() {
defer func() {
if v := recover(); v != nil {
call.err = errors.New("error from single flight")
}
close(call.done)
s.mutex.Lock()
delete(s.calls, key)
s.mutex.Unlock()
}()
call.value, call.err = action(ctx)
}()
return s.wait(ctx, call)
}
func (s *SingleFlight) wait(ctx context.Context, call *call) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-call.done:
return call.value, call.err
}
}
type Cache interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any) error
}
type Database interface {
Query(ctx context.Context, query string, args ...string) (any, error)
}
func GetUserBalance(ctx context.Context, userID string) (any, error) {
value, err := cache.Get(ctx, userID)
if err == nil {
return value, nil
}
const query = "SELECT balance FROM users WHERE user_id = ?"
return singleFlight.Do(ctx, userID, func(ctx context.Context) (any, error) {
value, err = database.Query(ctx, query, userID)
if err != nil {
return nil, err
}
_ = cache.Set(ctx, userID, value)
return value, err
})
}
1. Медленное выполнение первой горутины
2. Устаревшие данные
2. Moving Later для работы с синхронными репликами в кластере
Реализация паттерна Moving Later
type Database interface {
Query(query string) string // simple interface for example
}
func DistributedQuery(replicas []Database, query string) string {
responseCh := make(chan string, 1)
for _, replica := range replicas {
go func() {
select {
case responseCh <- replica.Query(query):
default:
return
}
}()
}
return <-responseCh
}
func main() {
replicas := []*PgSQLDatabase{
NewPgSQLDatabase("127.0.0.1:5432"),
NewPgSQLDatabase("127.0.0.2:5432"),
NewPgSQLDatabase("127.0.0.3:5432"),
}
response := DistributedQuery(replicas, "query to pgsql...")
_ = response
}
Зачем нужен буферизированный канал?
3. Err Group для запросов в кластер с несколькими шардами
Зачем нам дожидаться всех шардов?
type Database interface {
Query(query string) (string, error) // simple interface for example
}
func DistributedQuery(shards []Database, query string) []string {
var wg sync.WaitGroup
wg.Add(len(shards))
responseCh := make(chan string)
for _, shard := range shards {
go func() {
defer wg.Done()
response, _ := shard.Query(query)
responseCh <- response
}()
}
go func() {
wg.Wait()
close(responseCh)
}()
responses := make([]string, 0, len(shards))
for response := range responseCh {
responses = append(responses, response)
}
return responses
}
func main() {
shards := []*ClickHouseDatabase{
NewClickHouseDatabase("127.0.0.1:5432"),
NewClickHouseDatabase("127.0.0.2:5432"),
NewClickHouseDatabase("127.0.0.3:5432"),
}
response := DistributedQuery(shards, "query to clickhouse...")
_ = response
}
Проблема отказа одного из шардов
Решение с помощью паттерна Err Group
Реализация
type Group struct {
cancel func(error) // this is an anti-pattern, but ...
wg sync.WaitGroup
errOnce sync.Once
err error
}
func NewErrGroup(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Go(action func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := action(); err != nil {
g.errOnce.Do(func() {
g.err = err
g.cancel(g.err)
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
return g.err
}
type Database interface {
Query(query string) (string, error) // simple interface for example
}
func DistributedQuery(shards []Database, query string) ([]string, error) {
var mutex sync.Mutex
responses := make([]string, 0, len(shards))
group, ctx := NewErrGroup(context.TODO()) // nice to have parent context
for _, shard := range shards {
group.Go(func() error {
type result struct {
response string
err error
}
resultCh := make(chan result, 1)
go func() {
response, err := shard.Query(query)
resultCh <- result{response: response, err: err}
}()
select {
case <-ctx.Done():
return ctx.Err()
case result := <-resultCh:
if result.err != nil {
return result.err
}
mutex.Lock()
responses = append(responses, result.response)
mutex.Unlock()
return nil
}
})
}
if err := group.Wait(); err != nil {
return nil, err
} else {
return responses, nil
}
}
func main() {
shards := []*ClickHouseDatabase{
NewClickHouseDatabase("127.0.0.1:5432"),
NewClickHouseDatabase("127.0.0.2:5432"),
NewClickHouseDatabase("127.0.0.3:5432"),
}
response, err := DistributedQuery(shards, "query to clickhouse...")
_ = response
_ = err
}
как часто это встречается в работе и на собеседованиях
Дополнительная литература
Курсы для углубленного изучения
Заключение