30.12.2024

3 паттерна конкурентного программирования в Golang

ex-Team Lead в Яндекс
Владимир Балун
Для работы с базами данных
Работать с различными хранилищами данных — это ежедневная задача Go-разработчиков. Но что делать, если перед тобой не просто база данных, а сложная система? Например, база данных с репликацией, пошардированная база данных, либо база данных совместно используемая с кэшем.

К счастью, в Go есть паттерны, которые делают такие задачи проще и эффективнее. В этой статье мы разберем 3 популярных паттерна конкурентного программирования, которые помогут справляться со сложностями:
  • Single Flight для работы с кэшем и базой данных;
  • Moving Later для распределенных запросов в кластер баз данных с синхронными репликами;
  • Err Group для работы с кластером базы данных с несколькими шардами.
Еще расскажу, как часто разработчики сталкиваются с такими задачами на собеседованиях и в реальной работе. Данные основаны на опросе 395 специалистов. А в конце статьи тебя ждет подборка видео и статей, чтобы углубить знания о Concurrency в Go

Автор — Владимир Балун, ex-team lead в яндекс

руководил разработкой системы трейсинга (11ГБ/с трафик)
Yandex
разрабатывал системы трейсинга и непрерывного профилирования
Ozon
разрабатывал движок по подбору таргетированной рекламы
Tinkoff
разрабатывал Kaspersky Endpoint Security
Kaspersky Lab
поддерживал ICQ и разрабатывал My Teams
Mail.ru
руководил курсом Golang Developer.Professional
OTUS
спикер конференций
CodeFest и Saint HighLoad++
Откроется после 5-ой главы
Содержание

1. Single Flight: работа с кэшем и базой данных

Представь ситуацию: в MySQL хранятся пользовательские данные, а Redis используется для кэширования данных популярных пользователей. Типичный кейс для разработки.

Для взаимодействия с кэшем часто выбирают подход ленивого кэширования (lazy caching), также известный как cache aside. Именно такой мы и рассмотрим ниже.


Как это работает?

  1. Ищем данные в кэше;
  2. Если в кэше их нет, запрашиваем их из базы данных;
  3. Добавляем полученные данные в кэш;
  4. Отдаем результат пользователю.

Учебный пример реализации без использования отдельных слоев и прочего выглядит так:
type Cache interface {
	Get(ctx context.Context, key string) (any, error)
	Set(ctx context.Context, key string, value any) error
}

type Database interface {
	Query(ctx context.Context, query string, args ...string) (any, error)
}

func GetUserBalance(ctx context.Context, userID string) (any, error) {
	value, err := cache.Get(ctx, userID)
	if err == nil {
		return value, nil
	}

	const query = "SELECT balance FROM users WHERE user_id = ?"
	value, err = database.Query(ctx, query, userID)
	if err != nil {
		return nil, err
	}

	_ = cache.Set(ctx, userID, value)
	return value, err
}
На первый взгляд всё кажется простым и удобным. Однако в условиях больших нагрузок этот код может привести к серьёзным проблемам.

Проблема больших нагрузок

Представь, что система обрабатывает тысячи запросов в секунду (RPS). Если «горячий» ключ в кэше становится недействительным, тысячи пользователей могут одновременно попытаться его получить. Данные не найдутся в кэше, и все эти запросы направятся в базу данных.

Что в результате?
  1. перегрузка базы данных;
  2. возможный сбой или отказ базы данных.

Этот сценарий известен как Thundering Herd Problem. Чтобы справиться с этой проблемой, используется паттерн Single Flight.

Как работает Single Flight?

Single Flight решает проблему конкурентных запросов так:
  1. Когда несколько горутин запрашивают один и тот же ключ, паттерн пропускает только одну из них с запросом в базу данных.
  2. Остальные горутины ожидают, пока первая получит ответ из базы данных и затем получают данные.

Как это реализовать?

В Go есть готовая библиотека для этого паттерна — singleflight. Но для лучшего понимания мы напишем свою примитивную версию с нуля. Это поможет разобраться в деталях работы.
type call struct {
	err   error
	value any
	done  chan struct{}
}

type SingleFlight struct {
	mutex sync.Mutex
	calls map[string]*call
}

func NewSingleFlight() *SingleFlight {
	return &SingleFlight{
		calls: make(map[string]*call),
	}
}

func (s *SingleFlight) Do(ctx context.Context, key string, action func(context.Context) (any, error)) (any, error) {
    s.mutex.Lock()
	if call, found := s.calls[key]; found {
		s.mutex.Unlock()
		return s.wait(ctx, call)
	}

	call := &call{
		done: make(chan struct{}),
	}

	s.calls[key] = call
	s.mutex.Unlock()

	go func() {
		defer func() {
			if v := recover(); v != nil {
				call.err = errors.New("error from single flight")
			}

			close(call.done)

			s.mutex.Lock()
			delete(s.calls, key)
			s.mutex.Unlock()
		}()

		call.value, call.err = action(ctx)
	}()

	return s.wait(ctx, call)
}

func (s *SingleFlight) wait(ctx context.Context, call *call) (any, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-call.done:
		return call.value, call.err
	}
}
Теперь наш код для работы с кэшем и базой данных будет выглядеть так:
type Cache interface {
	Get(ctx context.Context, key string) (any, error)
	Set(ctx context.Context, key string, value any) error
}

type Database interface {
	Query(ctx context.Context, query string, args ...string) (any, error)
}

func GetUserBalance(ctx context.Context, userID string) (any, error) {
	value, err := cache.Get(ctx, userID)
	if err == nil {
		return value, nil
	}

	const query = "SELECT balance FROM users WHERE user_id = ?"
	return singleFlight.Do(ctx, userID, func(ctx context.Context) (any, error) {
		value, err = database.Query(ctx, query, userID)
		if err != nil {
			return nil, err
		}

		_ = cache.Set(ctx, userID, value)
		return value, err
	})
}
Тем не менее, у этого примера все равно еще есть несколько проблем, с которыми можно потенциально столкнуться в реальных условиях:

1. Медленное выполнение первой горутины

Первая горутина, запрашивающая данные из базы данных, может выполняться намного дольше, чем ожидалось. Это может быть связано с:
  • медленным сетевым соединением;
  • временной перегрузкой сервера;
  • другими внешними факторами.

Остальные горутины будут вынуждены ждать завершения первой горутины, что увеличивает общее время выполнения.
Решение: можно использовать таймауты. Таймаут позволяет задать максимальное время выполнения первой горутины. Если она не успела завершиться, её выполнение прерывается и пропускается в хранилище за данными какая-нибудь другая горутина.

2. Устаревшие данные

Данные, полученные из базы данных, могут часто меняться. Это приводит к тому, что к моменту завершения запроса первой горутины результат устаревает.
Решение: сделать ключ недействительным и запустить новый запрос из другой горутины для обновления данных.

2. Moving Later для работы с синхронными репликами в кластере

Представим, что у нас есть кластер базы данных PostgreSQL с синхронной репликацией, где один узел является ведущим (master), а остальные — репликами (slave). Кластер состоит из трех реплик, в нем хранятся данные о заказах, и наша цель — находить заказы по их идентификаторам за минимально-возможное время.

Поскольку у нас в кластере синхронная репликация, можем выполнить такой алгоритм работы с кластером:

1) Параллельные запросы ко всем репликам. Мы можем отправить запросы ко всем репликам одновременно. Это позволяет получить ответ как можно быстрее от той реплики, которая отреагирует первой.
2) Игнорирование лишних ответов. Как только одна из реплик возвращает ответ, мы сразу же отдаём его пользователю. Все остальные ответы от реплик игнорируются, так как «данные во всех репликах одинаковые».
Потенциальная проблема — параллельные запросы создают дополнительную нагрузку на кластер.
В условиях высокой нагрузки или ограниченных ресурсов это может привести к перегрузке кластера, поэтому данный подход стоит применять с осторожностью и только в случаях, где критично минимальное время ответа.

Реализация паттерна Moving Later

Для реализации этого подхода используем паттерн Moving Later. Для демонстрации паттерна используется учебный пример с простым интерфейсом базы данных:
type Database interface {
	Query(query string) string // simple interface for example
}

func DistributedQuery(replicas []Database, query string) string {
	responseCh := make(chan string, 1)
	for _, replica := range replicas {
		go func() {
			select {
			case responseCh <- replica.Query(query):
			default:
				return
			}
		}()
	}

	return <-responseCh
}

func main() {
	replicas := []*PgSQLDatabase{
		NewPgSQLDatabase("127.0.0.1:5432"),
		NewPgSQLDatabase("127.0.0.2:5432"),
		NewPgSQLDatabase("127.0.0.3:5432"),
	}

	response := DistributedQuery(replicas, "query to pgsql...")
	_ = response
}

Зачем нужен буферизированный канал?

Если запись в канал не может пройти сразу, будет выбрана ветка в select по умолчанию. Поэтому выполнение записи без блокировки гарантирует, что ни одна из запущенных в цикле горутин не останется бесконечно висеть и канал можно не закрывать.

Однако, если запись пройдет до того, как выполнение функции DistributedQuery добралось до чтения из канала (крайне маловероятная ситуация, но тем не менее), отправка может завершиться неудачей, потому что никто еще не готов, и для всех горутин в селекте будет выбрана ветка по умолчанию.
Решение: буферизированный канал гарантирует, что запись в него всегда будет успешной, и первое значение будет получено независимо от порядка выполнения.
  • Вопрос:
    Нужно ли закрывать канал в данном примере?
    Ответ:
    Нет, закрывать канал необязательно.

3. Err Group для запросов в кластер с несколькими шардами

Предположим, что у нас есть кластер базы данных ClickHouse, состоящий из нескольких шардов. Также допустим, что у ClickHouse старая версия, которая не поддерживает Distributed Table Engine. В каждом шарде хранятся данные о продажах разных магазинов крупной розничной сети.

Задача — написать распределённый запрос, который будет обращаться ко всем шардам и собирать общую выручку за последнюю неделю со всех магазинов.

Регистрация занимает <1 минуты

Зарегистрируйся на платформе, чтобы продолжить

Статистика, как часто это встречается в работе и на собеседованих

Теория и примеры реализации паттерна Err Group

После регистрации откроются:

Допматериалы и 2 урока по Concurreny: мьютексы и каналы

Следующий урок
Внутреннее устройство каналов в Golang

Внутреннее устройство мьютексов