Apache Zookeeper. Паттерны использования.

Дащинский Иван

Zookeeper применение

  • Отслеживание и обнаружения сервисов (Service discovery)

  • Динамическая конфигурация

  • Выбор лидера

  • Распределенные блокировки, семафоры и проч.

Zookeeper основы.

Обозначения

  1. Эпоха — \(E := \{e_i : i \in N, e_i < e_j \}\)

  2. Лидер — \(\Lambda:= \{\lambda_{e_1}... \lambda_{e_N}\} : \lambda_{e_i} \prec \lambda_{e_j} \quad iff \quad e_i < e_j \)

  3. Идентификатор транзакции — \(zxid := <e_i, j>, j \in N \)

  4. Кворум — \(Q \subseteq {s_1, s_2, ... s_N} : |Q| > N/2\) Транзакции распространяются лидером и фиксируются серверами.

Strong leadership

  1. В любой момент времени может быть только один лидер

  2. Запросы на изменения данных перенаправляются и обрабатываются только лидером

  3. Запросы на изменения данных подтверждаются кворумом

  4. Перед подтверждением, транзакция записывается каждым сервером в транзакционный журнал на диск (transactional log)

  5. Периодически происходит запись состояния реплицированной структуры данных в персистентную память (snapshot)

Диаграмма состояний сервера

zk service state

ZAB. Лидер (LEADER)

  1. При получении запроса на изменение от клиента или последователя:

    1. Генерирует предложение (PROPOSAL) и отправляет последователям.

    2. Синхронизирует данные в журнал транзакции и генерирует подтверждение от себя (ACK)

  2. При получении подтверждений (ACK) от кворума отсылает запрос COMMIT и изменяет у себя данные

leader zab

ZAB. Последователь (FOLLOWER)

  1. При получении запроса на изменения от клиента:

    1. Перенаправляет запросы на изменения лидеру

    2. При получении COMMIT меняет структуру данных

  2. При получении предложения от лидера (PROPOSAL):

    1. Синхронизирует данные в журнал транзакций

    2. Шлет подтверждение лидеру (ACK)

follower zab 1
follower zab 2

Выборы лидера (LEADER ELECTION)

  1. Создаем свой голос \(v := \langle S_{id}, zxid\rangle\)

  2. Отправляем голос другим серверам

  3. При получении голоса \(v' := \langle S'_{id}, zxid'\rangle\) от другого сервера

    1. Если \(zxid > zxid' \lor zxid = zxid', S_{id} > S'_{id}\) то отправляем свой голос повторно

    2. Иначе меняем свой голос на \(v'\)

  4. Продолжаем выборы пока не получим от всех серверов одинаковые голоса.

Синхронизация

  1. Последователи от нового лидера получают

    1. DIFF или TRUNC если нужно применить разность или отбросить транзакции с определенной \(zxid\)

    2. SNAP если последователь сильно отстал и надо применить полный снимок данных с лидера

ZAB (Гарантии)

  • Если \(zxid < zxid'\) и на каком-то произвольном сервере зафиксирована \(tx_{zxid'}\), то на этом сервере зафиксирована \(tx_{zxid}\), причем \(tx_{zxid} \prec tx_{zxid'}\).

Клиентские сессии

  • У клиентской сессии есть длительность (session timeout)

  • Клиентские сессии отслеживаются лидером

  • Клиентские сессии синхронизируются от последователя к лидеру каждые \(tick / 2\)

  • Клиент при отсутствии активности шлет пустые запросы на сервер, обновляя сессию.

  • Клиент при переподключении к тому же или другому серверу может восстановить сессию в течении session timeout.

  • Данные, привязанные к сессии очищаются лидером по инвалидации сессии.

Клиентское API

Структура данных

Иерархическая древовидная структура данных:

  • Узел дерева содержит массив данных, пути к дочерним узлам, структуру Stat

  • По факту представляет собой хэш таблицу узлов + служебные структуры данных

Типы узлов:

  • EPHEMERAL

  • EPHEMERAL_SEQUENTIAL

  • PERSISTENT

  • PERSISTENT_SEQUENTIAL

API

class Zookeeper {
    String create(String path, byte data[], List<ACL> acl, CreateMode createMode);
    Stat setData(String path, byte data[], int version);
    void delete(String path, int version);

    Stat exists(String path, Watcher watcher);
    List<String> getChildren(String path, Watcher watcher, Stat stat);
    byte[] getData(String path, Watcher watcher, Stat stat);
}

Отслеживание изменений (Watcher)

class WatchedEvent {
    KeeperState keeperState;
    EventType eventType;
    String path;
}

interface Watcher {
    void process(WatchedEvent event);
    interface Event {
        static enum EventType {
            None(-1),
            NodeCreated(1),
            NodeDeleted(2),
            NodeDataChanged(3),
            NodeChildrenChanged(4),
            DataWatchRemoved(5),
            ChildWatchRemoved(6);
        }
        static enum KeeperState {
            Unknown(-1),
            Disconnected(0),
            NoSyncConnected(1),
            SyncConnected(3),
            AuthFailed(4),
            ConnectedReadOnly(5),
            SaslAuthenticated(6),
            Expired(-112),
            Closed(7);
        }
    }
}

Отслеживание изменений (Watcher)

  • Изменения отправляются только один раз (требуется переподписка)

  • При переподключении клиента требуется переподписка

  • Гарантируется, что если \(tx \prec tx'\), то придет оповещение \(W_{tx}\), а не \(W_{tx'}\).

"Введение" в GO

  • Сопрограммы (goroutines)

    • Выполняются поверх нитей ОС

    • Имеют свой стек

    • Планировщик внутри runtime

  • Каналы

    • Hoare, C. A. R. (1985). Communicating Sequential Processes. Prentice Hall. ISBN 978-0-13-153289-2.

ch := make(chan string)
go func() {
    for i := 0; i < 10; i++ {
        ch <- strconv.Itoa(i) // Отправляем данные в канал
        time.Sleep(1 * time.Second)
    }
}() // Запускаем анонимную горутину

for {
    select { (3)
        case i := <- ch: // Читаем данные из канала
            fmt.Println("Got number: %s", i)
            continue
        case <- closeCh:
        	return
    }
}

API GO-style

// Запросы на чтение
func (client *ZkClient) Exists(path string) (ok bool, stat *zk.Stat, err error)
func (client *ZkClient) Get(path string) (data []byte, stat *zk.Stat, err error)
func (client *ZkClient) Children(path string) (children []string, stat *zk.Stat, err error)

// Запросы с подпиской.
func (client *ZkClient) ExistsW(path string) (ok bool, stat *zk.Stat, evt <-chan zk.Event, err error)
func (client *ZkClient) GetW(path string) (data []byte, stat *zk.Stat, evt <-chan zk.Event, err error)
func (client *ZkClient) ChildrenW(path string) (children []string, stat *zk.Stat, evt <-chan zk.Event, err error)

// Запросы на изменение
func (client *ZkClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (res string, err error)
func (client *ZkClient) Set(path string, data []byte, version int32) (stat *zk.Stat, err error)
func (client *ZkClient) Delete(path string, version int32) (err error)

Получение данных и подписка

path := "/some_node"
for {
    data, _, evt, err := cli.GetW(path) // Получение данных из /some_node с подпиской
    if err == zk.ErrNoNode { // Если /some_node не существует, продолжим цикл сначала
        continue
    } else if err != nil {
        panic(err)
    }
    fmt.Printf("Got new value %s", string(data))
    select {
    case e := <-evt:
        if e.Type != zk.EventNodeDataChanged {
            fmt.Printf("Unexpected type of event %s\n", e.Type)
        }
        continue //  Переходим в начало цикла.
    }
}

Рецепты

Мьютекс (naive)

func (l *Lock) Lock() error {
    path := l.CreateLockNode() //Создаем EPHEMERAL_SEQUENTIAL -lock/guid-00000x
    order := parseSeq(path) // Определяем номер из path znode, созданной на пред. шаге

    for {
        children, _, evt, _ := l.zk_client.ChildrenW(l.path) // Берем дочерние узлы и ставим watch (канал evt)
        lowestOrder := process(children)

        if order == lowestOrder { // Если у нашего узла самый низкий номер, мы захватили лок
            break
        }

        ev := <-evt // Приостанавливаем сопрограмму пока не придет событие из канала evt
        if ev.Err != nil {
            return ev.Err
        }
    }
    l.lock = path
    return nil
}

func (l *Lock) Unlock() {
    _ = l.zk_client.Delete(l.lock, -1) // Отпуская мьютекс, просто удалем znode
}

Что не так?

thundering herd
Figure 1. Thundering herd problem

Мьютекс

func (l *Lock) Lock() error {
    path := l.CreateLockNode()
    order := parseSeq(path)

    for {
        children, _, _ := l.zk_client.Children(l.path) // Берем дочерние узлы без отслеживания
        lowestOrder, prevOrderPath := process(children, order)

        if order == lowestOrder {
            break
        }

        exists, _, ch, err := l.zk_client.ExistsW(l.path + "/" + prevOrderPath) // Берем предыдущую по порядку znode
        if err != nil {
            return err
        } else if !exists { // Если она была удалена, переходим в начало цикла
            continue
        }

        ev := <-ch // Иначе приостанавливаем сопрограмму пока не придет событие в канал
        if ev.Err != nil {
            return ev.Err
        }
    }
    l.lock = path
    return nil
}

Выбор лидера

func NewService() *Service {
    service := &Service{
        ....
    }
    service.localNodePath = createServiceNode() // Создаем znode EPHEMERAL_SEQUENTIAL с именем _services/guid-0000x
    go loop()
    return &service
}

func (service *Service) loop() {
    LOOP:
    for {
        isLeader, changeLeaderChan, err := service.checkLeader(service.localNodePath) // Проверяем кто сейчас лидер.

        if isLeader {
            go service.lead() // Если текущий сервис лидер, запускаем сопрограмму lead
        } else {
            go service.follow() // Иначе сопрограмму follow
        }

        for {
            select {
            case e := <-changeLeaderChan: // Отслеживаем события из канала, на который создали ранее
                switch e.Type {
                case zk.EventNodeDeleted: // Если отслеживаемая znode была удалена, переходим в начало цикла
                    continue LOOP
                default: // Иначе переподписываемся (заодно проверяем что znode не была удалена после события)
                    var exists bool
                    exists, _, changeLeaderChan, _ = service.zk_client.ExistsW(e.Path)

                    if !exists {
                        continue LOOP
                    }
                    continue
                }
            }
        }
    }
}

Выбор лидера

func (service *Service) checkLeader(nodePath string) (bool, ev <-chan zk.Event, error) {
    for {
    	children, _, err := disco.client.Children(alivePath) // Берем все дочернии узлы без отслеживания
        order := parseSeq(nodePath)
        lowestOrder, previousOrderPath := process(children, order)

        if lowestOrder == order { // Если у текущего процесса самый низкий порядок, он лидер
            return true, nil, nil
        } else {
            exists, _, ev, err = service.zk_client.ExistsW(previousOrderPath) // Иначе отслеживаем предыдущий по порядку
            if !exists {
                continue
            } else {
                return false, ev, nil
            }
        }
    }
}

Рецепты это сложно

  • Что делать?

  • Используйте Apache Curator (Если Java)

  • Если не Java, ищите готовые аналоги.

Кейс Apache Ignite

TcpDiscovery

disco ring

TcpDiscovery

  • Линейное время обхода.

  • Проблема на 1 узле — проблема на всем кластере.

disco ring 1 trouble
disco ring all trouble

Гарантии Discovery в Ignite

  • В один момент времени один лидер (координатор)

  • Упорядоченность событий

  • Гарантируется доставка сообщений до всех узлов кластера

ZookeeperDisсovery

  • Оповещаются узлы независимо.

  • Проблемы на 1 узле другие узлы не затрагивает.

zk ignite cluster
zk ignite cluster trouble

ZookeeperDiscovery. Схема

zk znode structure

ZookeeperDiscovery

  • Каждый узел регистрирует /<root>/n/guid-0000x (EPHEMERAL_SEQUENTIAL)

  • Серверные узлы участвуют в выборе лидера(координатора) (см. рецепт Leader Election)

ZookeeperDiscovery. События

  • Все события генерирует только координатор

  • События сохраняются в /<root>/e в виде

    • Упорядоченный список событий

    • Версия топологии

    • Номер последнего события и номер последнего сообщения

  • Остальные узлы кластера отслеживают изменения в этой znode.

  • По обработке события узлы сохраняют номер последнего обработанного события в /<root>/n/guid-0000x

  • Координатор отслеживает изменения данных в каждой znode в /<root>/n и удаляет из списка события, которые уже обработаны всеми узлами кластера.

ZookeeperDiscovery. Вход узла в топологию.

  • При входе узла, узел создает

    • Собственную znode /<root>/n/guid-0000x (EPHEMERAL_SEQUENTIAL)

    • Метаданные, которыми узел делится с кластером /<root>/jd/<guid>

  • Координатор отслеживает изменения дочерних узлов в /<root>/n

    • При добавлении узла создается новое событие

    • Координатор сохраняет для нового узла метаинформацию и текущую топологию кластера в /<root>/e/fj-<evtid>

  • Все узлы кластера обрабатывают вход нового узла и обрабатывают сохраненные метаданные этого узла

  • Новый узел обрабатывает событие и по его идентификатору обрабатывает метаданные кластера из /<root>/e/fj-<evtid>

  • После обработки всеми узлами кластера события, координатор удаляет ненужные данные ( /<root>/e/fj-<evtid> и /<root>/e/jd/<guid>)

ZookeeperDiscovery. Обработка выхода узла

  • При выходе узла из топологии, удаляется его znode /<root>/n/guid-0000x

    • При штатном выходе узел удалает ее сам а также создает стоп флаг
      /<root>/sf/<guid>-0000x (с версии 2.9.1)

    • При аварийном выходе — zookeeper.

  • Координатор получает нотификацию о удалении узла.

    • Если не найдет стоп-флаг — создает событие о аварийном выходе узла.

    • Если найден — создает событие о штатном выходе узла (с версии 2.9.1) и удаляет стоп-флаг.

ZookeeperDiscovery. Отправка сообщения

  • Узел сериализует сообщение и сохраняет его в znode /<root>/ce/<node-guid>-0000x

  • Координатор получает оповещение об добавлении новых дочерних узлах в /<root>/ce

    • Координатор проверяет новые сообщения и пропускает если сообщение уже обработано

  • Координатор добавляет новое событие и собирает подтверждения от других узлов

  • Если событие обработано всеми узлами, лишние данные удаляются координатором.

ZookeeperDiscovery. Отправка сообщения (Ack Message)

  • Если сообщения подразумевает т.н. Ack Message

    • Координатор после обработки исходного сообщения всеми узлами сериализует Ack Message
      в /<root>/ca/<evt-id>

    • Координатор добавляет новое событие в /<root>/e

  • После обработки ack события другими узлами, координатор удаляет лишние данные.

ZookeeperDiscovery. Схема

zk znode structure

Материалы

zk book cover

ZooKeeper: Distributed Process Coordination
Flavio Junqueira, Benjamin Reed