Анализ задержки рейсов с помощью Scylla на базе Spark
Проект состоит из двух логических модулей:
-
загрузчика, который загружает данные из файла /tmp/2008.csv в ScyllaDB;
-
экстрактора, который обрабатывает запрошенные сведения и извлекает их из большого объема данных. Для демонстрации нашего примера запустим несколько запросов, используя Spark Scala API. В качестве примера проанализируем пункты назначения и авиакомпании-перевозчики с наибольшим количеством задержек и отмен вылетов.
Scylla в Spark: пошаговая инструкция
Начальные настройки
Разворачиваем Scylla. Вы можете установить Scylla любым из предложенных способов.
Например, для установки Scylla мы используем docker (выбраны параметры по умолчанию):
Запускаем nodetool и cqlsh. Убеждаемся, что Scylla работает:
Удостоверились в том, что cql функционирует нормально:
Настраиваем Scylla (*выборочно). По умолчанию Scylla задействует все процессорные ресурсы и память. Чтобы настроить ограничение доступа к ресурсам, мы использовали команды:
Ниже - пример строгого ограничения для Scylla:
Подготовка данных
Следующим шагом мы подготавливаем необходимые данные: загружаем и распаковываем данные из архива. Чтобы наш эксперимент был интереснее, мы использовали публичные базы данных, в частности - сведения Статистического вычислительного центра за 2008 г. Для этого запускаем в терминале следующие команды:
Следующий шаг - запускаем следующие скрипты в cqlsh:
Установка Java, Scala, Sbt
Устанавливаем OpenJDK. Для установки OpenJDK в Ubuntu или Debian, используя apt-get, выполните в терминале следующую команду:
Для установки OpenJDK в Fedora, Oracle или Redhat необходимо выполнить команду:
Устанавливаем Sbt. Чтобы установить Sbt в Ubuntu или Debian, выполните в терминале следующие команды:
Для инсталляции Sbt в Fedora, Oracle или Red Hat пишем следующие команды:
Устанавливаем Scala. Для установки Scala, выполните в терминале следующие команды:
Также добавляем следующий текст в файл:
Подготовим Spark. Для того, чтобы установить Apache Spark, мы выполняем в терминале следующее:
Подготовим приложение. Данные из RITA - это большой массив данных, более 120 миллионов записей, что составляет 1.6 Гб сжатых данных на диске. Для нашего примера мы берем всего лишь 7 миллионов строк. Чтобы подготовить приложение для запуска, мы должны скачать rita-analyzer из github-репозитория.
Первое, скачаем с репозитория папку rita-analyzer:
Второе, необходимо скомпилировать приложение. Для этого в каталоге rita-analyzer выполним команду:
Это позволило загрузить все необходимые зависимости и создать выходной jar-файл:
В-третьих, настроим файл spark-scylla.conf. Если мы собираемся запустить Spark и Scylla на той же самой ноде, мы должны будем ограничить количество ядер, используемых Spark, до двух, добавив в spark-scylla.conf следующую строку:
где local означает, что Spark запускается локально в одном рабочем потоке (т.е. без параллелизма вообще).
Следующий шаг – это загрузка ранее скаченных данных из файла /tmp/2008.csv в ScyllaDB и выполнение следующей команды в каталоге Spark:
Данные из файла /tmp/2008.csv в Scylla будем загружать порциями по 1000 строк. На уровне кода эта ункциональность будет выглядеть следующим образом:
В консоли терминала мы можем увидеть некоторые лог-данные и наблюдать процесс выбора данных в Spark UI (http://localhost:4040). Проверить данные в Scylla можно как во время работы приложения, так и после завершения его работы (мы ограничили результаты выборки до 10 первых строк):
Результат должен выглядеть примерно следующим образом:
Запуск приложения
В первом примере, расположенном ниже, мы продемонстрируем, как использовать Spark API Scala в сочетании с ScyllaDB для получения данных из публичного хранилища данных о среднем времени прибытия/отбытия рейсов или их отмены в течение одного года на основе общедоступной статистики RITA.
Возможны пять вариантов использования:
- Топ-3 пункта назначения с наибольшей средней задержкой прибытия
- Топ-3 точек отправления с наибольшей средней задержкой отправления
- Топ-3 авиакомпании с наибольшей средней задержкой отправления и прибытия
- Топ-3 перевозчика с наиболее частой отменой рейсов
- Топ-3 авиакомпании с минимальной средней задержкой отправления
Топ-3 пункта назначения с наибольшей средней задержкой прибытия
В данном случае мы вычислим пункты назначения с максимальной средней задержкой прибытия. Для этого нам понадобятся данные о перевозчиках с наибольшим показателем задержки для выбранных пунктов прибытия/отправления, при вылете из которых наблюдалось наибольшее число задержек.
Мы следуем следующему алгоритму:
- 1. Фильтруем по условию: поле arrdelay > 0
- 2. Группируем по полю dest
- 3. Для каждой группы мы:
- 3.1. Вычисляем среднее время задержки прибытия List[{rows of rita data}].map(x => x.arrdelay).sum / List[{rows of rita data}].size.
- 3.2. Сгруппируем по полю carrier.
-
- 3.2.1. Для каждой подгруппы, нам вычисляем среднее время задержки прибытия по принципу пункта 3.1.
- 3.2.2. Отсортируем его по descending.
- 3.2.3. Далее берем первые 3 записи
- 3.3. И группируем их по полю origin.
-
- 3.3.1. Для каждой подгруппы вычисляем среднюю задержку прибытия так же, как в пункте 3.1 для групп.
- 3.3.2. Отсортируем по descending.
- 3.3.3. Опять берем первые 3 записи
- 4. Отсортируем их по descending (по средней задержке прибытия).
- 5. Берем первые 3 записи, и результат будет выглядеть следующим образом:
Где: MQ – Envoy Air, Американская авиакомпания MKE – Международный аэропорт имени генерала Митчелла, Милуоки, Висконсин GRB - Международный аэропорт Грин Бэй-Остин Страубел, Остин, Висконсин ORD - Международный аэропорт О’Хара, Чикаго, Иллинойс Ниже продемонстрирован исходный код описанного случая на Spark:Исходный код можно найти здесь.
Топ-3 точек отправления с наибольшей средней задержкой отправления
В следующем примере определем места вылета с максимальной средней задержкой отправления. Для этого мы получим данные об авиакомпаниях с максимальным показателем отсрочки вылета. Основные шаги такие же, что и в первом примере.
Результаты ниже – это то, что мы можем получить с помощью алгоритма, реализованного с помощью следующего исходного кода.Где: CMX - Аэропорт Хоутон Каунти Мемориал; 9E - Endeavor Air, региональная авиакомпания Соединённых Штатов; MSP - Международный аэропорт Миннеаполис/Сент-Пол.
Топ-3 авиакомпании с наибольшей средней задержкой отправления и прибытия
Найдем авиакомпании с максимальной средней задержкой времени отправлениия / прибытия. Исходный код алгоритма можно найти здесь.
Где: YV - Mesa Airlines, региональная авиакомпания Соединённых Штатов Америки;
B6 - JetBlue Airways, бюджетная американская авиакомпания; OH - PSA Airlines, региональная авиакомпания Соединённых Штатов Америки.Топ-3 перевозчика с наиболее частой отменой рейсов
Найдем также авиакомпании с максимальным количеством отмененных рейсов. Исходный код алгоритма так же можете найти по ссылке.
В результате мы получили:Где: MQ – American Eagle Airlines, Американская авиакомпания; OO - SkyWest Airlines, региональная авиакомпания США; AA - American Airlines, американская авиакомпания.
Топ-3 авиакомпании с минимальной средней задержкой отправления
В следующем примере мы найдем авиакомпании с минимальной средней задержкой отправления таким же способом, как мы использовали в третьем примере. Результат ниже – это то, что мы получили при помощи алгоритма, созданного на базе следующего исходного кода:
Вот вывод данных работы алгоритма:Где: HA - Hawaiian Airlines, американская авиакомпания; US - US Airways, американская авиакомпания;
F9 - Frontier Airlines, магистральная авиакомпания США.Таким образом, для получения результатов, представленных выше, мы должны запустить следующую команду в терминале в Spark каталоге:
Заключение
Конечно, Scylla не поможет вам избежать задержек рейсов во время путешествий, но она способна сократить время ожидания, размер кластера и другие метрики. В авиабизнесе быстрая обработка данных – ключ к обработке и анализу событий в реальном времени. Благодаря своей высокой производительности Scylla позволяет сделать это в рекордно короткие сроки. Более того, разработчикам не нужно менять структуру данных для уже существующих проектов в Cassandra. Они могут использовать их напрямую в Scylla.