Для тестирования эволюции схем состояния в Apache Flink, когда используется пользовательский сериализатор, например JacksonStateSerializer, необходимо создать два разных classloader'а, каждый из которых будет содержать свою версию класса состояния. Это позволяет имитировать ситуацию, когда схема состояния меняется между двумя запусками приложения. Тест состоит из двух этапов. На первом этапе создается и сохраняется состояние с первой версией схемы, а на втором этапе происходит восстановление состояния с новой версией схемы.
Для реализации этого процесса используется ряд вспомогательных компонентов.
Сам тест включает в себя создание Flink MiniCluster с первой версией схемы, сохранение состояния с использованием этой схемы и снятие savepoint'а. Далее запускается новый Flink MiniCluster с другой версией схемы, восстановлением состояния из savepoint'а и проверкой, что данные успешно десериализовались в новую схему. Это достигается путем добавления в classpath каждой джобы своей версии схемы с помощью URLClassLoader.
Ключевым моментом теста является то, что используются разные ClassLoader'ы, что позволяет избежать конфликта имен классов. Это дает возможность независимо манипулировать разными версиями схемы состояния и тестировать эволюцию схемы в различных направлениях (v1 -> v2 и v2 -> v1). Для инициализации объектов в тесте применяется библиотека EasyRandom, которая генерирует случайные объекты по схеме. Это позволяет получить объект нужной версии схемы, сгенерировав его на основании загруженного класса.
Изображение носит иллюстративный характер
Для реализации этого процесса используется ряд вспомогательных компонентов.
MiniClusterUtils
обеспечивает запуск Flink MiniCluster с возможностью указания пользовательского classpath. TestStatefulMapCounter
представляет собой простой stateful оператор, который инкрементирует счетчик при повторном обращении к состоянию. ObjectTypeClassLoaderDto
хранит объект с типом, загруженным определенным classloader'ом. StateClassLoadingUtil
компилирует классы из текстовых файлов с исходным кодом, а ClassLoaderUtils
выполняет запись исходников в файлы, компиляцию и создание ClassLoader. Сам тест включает в себя создание Flink MiniCluster с первой версией схемы, сохранение состояния с использованием этой схемы и снятие savepoint'а. Далее запускается новый Flink MiniCluster с другой версией схемы, восстановлением состояния из savepoint'а и проверкой, что данные успешно десериализовались в новую схему. Это достигается путем добавления в classpath каждой джобы своей версии схемы с помощью URLClassLoader.
Ключевым моментом теста является то, что используются разные ClassLoader'ы, что позволяет избежать конфликта имен классов. Это дает возможность независимо манипулировать разными версиями схемы состояния и тестировать эволюцию схемы в различных направлениях (v1 -> v2 и v2 -> v1). Для инициализации объектов в тесте применяется библиотека EasyRandom, которая генерирует случайные объекты по схеме. Это позволяет получить объект нужной версии схемы, сгенерировав его на основании загруженного класса.