Анализ задержки рейсов с помощью Scylla на базе Spark

Анализ задержки рейсов с помощью Scylla на базе Spark

Аватар пользователя Диана Сосновская
Диана Сосновская
06 июня 2017

В этой статье мы расскажем об эффективном тандеме Spark Scala API и Scylla, который позволит получить мгновенный результат. Из статьи вы узнаете как вычислить среднее время прибытия/отбытия рейсов или их отмены в течение одного года на основе статистики RITA, если конкретнее, то речь пойдет о средней задержке прибытия/отбытия и отмене вылетов для каждого авиаперевозчика. Данные, полученные на сайте RITA, содержат информацию о вылетах и прилетах всех коммерческих рейсов, осуществленных крупными авиакомпаниями на территории США в период с 1987 по 2008 гг. Это примерно 120 млн. записей в формате csv-данных, а объем не распакованного формата порядка 120 Гб.

Проект состоит из двух логических модулей:

  • загрузчика, который загружает данные из файла /tmp/2008.csv в ScyllaDB;

  • экстрактора, который обрабатывает запрошенные сведения и извлекает их из большого объема данных. Для демонстрации нашего примера запустим несколько запросов, используя Spark Scala API. В качестве примера проанализируем пункты назначения и авиакомпании-перевозчики с наибольшим количеством задержек и отмен вылетов.

Scylla в Spark: пошаговая инструкция

Начальные настройки

Разворачиваем Scylla. Вы можете установить Scylla любым из предложенных способов.

Например, для установки Scylla мы используем docker (выбраны параметры по умолчанию):

Запускаем nodetool и cqlsh. Убеждаемся, что Scylla работает:

Удостоверились в том, что cql функционирует нормально:

Настраиваем Scylla (*выборочно). По умолчанию Scylla задействует все процессорные ресурсы и память. Чтобы настроить ограничение доступа к ресурсам, мы использовали команды:

Ниже - пример строгого ограничения для Scylla:

Подготовка данных

Следующим шагом мы подготавливаем необходимые данные: загружаем и распаковываем данные из архива. Чтобы наш эксперимент был интереснее, мы использовали публичные базы данных, в частности - сведения Статистического вычислительного центра за 2008 г. Для этого запускаем в терминале следующие команды:

Следующий шаг - запускаем следующие скрипты в cqlsh:

Установка Java, Scala, Sbt

Устанавливаем OpenJDK. Для установки OpenJDK в Ubuntu или Debian, используя apt-get, выполните в терминале следующую команду:

Для установки OpenJDK в Fedora, Oracle или Redhat необходимо выполнить команду:

Устанавливаем Sbt. Чтобы установить Sbt в Ubuntu или Debian, выполните в терминале следующие команды:

Для инсталляции Sbt в Fedora, Oracle или Red Hat пишем следующие команды:

Устанавливаем Scala. Для установки Scala, выполните в терминале следующие команды:

Также добавляем следующий текст в файл:

Подготовим Spark. Для того, чтобы установить Apache Spark, мы выполняем в терминале следующее:

Подготовим приложение. Данные из RITA - это большой массив данных, более 120 миллионов записей, что составляет 1.6 Гб сжатых данных на диске. Для нашего примера мы берем всего лишь 7 миллионов строк. Чтобы подготовить приложение для запуска, мы должны скачать rita-analyzer из github-репозитория.

Первое, скачаем с репозитория папку rita-analyzer:

Второе, необходимо скомпилировать приложение. Для этого в каталоге rita-analyzer выполним команду:

Это позволило загрузить все необходимые зависимости и создать выходной jar-файл:


В-третьих, настроим файл spark-scylla.conf. Если мы собираемся запустить Spark и Scylla на той же самой ноде, мы должны будем ограничить количество ядер, используемых Spark, до двух, добавив в spark-scylla.conf следующую строку:

где local означает, что Spark запускается локально в одном рабочем потоке (т.е. без параллелизма вообще).

Следующий шаг – это загрузка ранее скаченных данных из файла /tmp/2008.csv в ScyllaDB и выполнение следующей команды в каталоге Spark:

Данные из файла /tmp/2008.csv в Scylla будем загружать порциями по 1000 строк. На уровне кода эта ункциональность будет выглядеть следующим образом:

В консоли терминала мы можем увидеть некоторые лог-данные и наблюдать процесс выбора данных в Spark UI (http://localhost:4040). Проверить данные в Scylla можно как во время работы приложения, так и после завершения его работы (мы ограничили результаты выборки до 10 первых строк):

Результат должен выглядеть примерно следующим образом:

Запуск приложения

В первом примере, расположенном ниже, мы продемонстрируем, как использовать Spark API Scala в сочетании с ScyllaDB для получения данных из публичного хранилища данных о среднем времени прибытия/отбытия рейсов или их отмены в течение одного года на основе общедоступной статистики RITA.
Возможны пять вариантов использования:

  • Топ-3 пункта назначения с наибольшей средней задержкой прибытия
  • Топ-3 точек отправления с наибольшей средней задержкой отправления
  • Топ-3 авиакомпании с наибольшей средней задержкой отправления и прибытия
  • Топ-3 перевозчика с наиболее частой отменой рейсов
  • Топ-3 авиакомпании с минимальной средней задержкой отправления

Топ-3 пункта назначения с наибольшей средней задержкой прибытия

В данном случае мы вычислим пункты назначения с максимальной средней задержкой прибытия. Для этого нам понадобятся данные о перевозчиках с наибольшим показателем задержки для выбранных пунктов прибытия/отправления, при вылете из которых наблюдалось наибольшее число задержек.

Мы следуем следующему алгоритму:

  • 1. Фильтруем по условию: поле arrdelay > 0
  • 2. Группируем по полю dest
  • 3. Для каждой группы мы:
  • 3.1. Вычисляем среднее время задержки прибытия List[{rows of rita data}].map(x => x.arrdelay).sum / List[{rows of rita data}].size.
  • 3.2. Сгруппируем по полю carrier.
    • 3.2.1. Для каждой подгруппы, нам вычисляем среднее время задержки прибытия по принципу пункта 3.1.
    • 3.2.2. Отсортируем его по descending.
    • 3.2.3. Далее берем первые 3 записи
    • 3.3. И группируем их по полю origin.
      • 3.3.1. Для каждой подгруппы вычисляем среднюю задержку прибытия так же, как в пункте 3.1 для групп.
      • 3.3.2. Отсортируем по descending.
      • 3.3.3. Опять берем первые 3 записи
    • 4. Отсортируем их по descending (по средней задержке прибытия).
    • 5. Берем первые 3 записи, и результат будет выглядеть следующим образом:
    Пункт назначения: MQT (Мичиганский аэропорт), среднее время задержки прибытия: 68 мин… Топ-3 авиакомпании с наибольшей средней задержкой прибытия: … Авиакомпания: MQ, среднее время задержки прибытия: 68 мин…. Топ-3 точек отправления с наибольшей средней задержкой отправления: …… из точки отправления: MKE, среднее время задержки прибытия: 78 мин…… из точки отправления: GRB, среднее время задержки прибытия: 73мин… из точки отправления: ORD, среднее время задержки прибытия: 54 мин… и т.д.

    Где: MQ – Envoy Air, Американская авиакомпания MKE – Международный аэропорт имени генерала Митчелла, Милуоки, Висконсин GRB - Международный аэропорт Грин Бэй-Остин Страубел, Остин, Висконсин ORD - Международный аэропорт О’Хара, Чикаго, Иллинойс Ниже продемонстрирован исходный код описанного случая на Spark:
    
    

    Исходный код можно найти здесь.

    Топ-3 точек отправления с наибольшей средней задержкой отправления

    В следующем примере определем места вылета с максимальной средней задержкой отправления. Для этого мы получим данные об авиакомпаниях с максимальным показателем отсрочки вылета. Основные шаги такие же, что и в первом примере.
    Результаты ниже – это то, что мы можем получить с помощью алгоритма, реализованного с помощью следующего исходного кода.

    Где:  CMX - Аэропорт Хоутон Каунти Мемориал; 9E - Endeavor Air, региональная авиакомпания Соединённых Штатов; MSP - Международный аэропорт Миннеаполис/Сент-Пол.

    Топ-3 авиакомпании с наибольшей средней задержкой отправления и прибытия

    Найдем авиакомпании с максимальной средней задержкой времени отправлениия / прибытия. Исходный код алгоритма можно найти здесь.

    Где: YV - Mesa Airlines, региональная авиакомпания Соединённых Штатов Америки;
    B6 - JetBlue Airways, бюджетная американская авиакомпания;  OH - PSA Airlines, региональная авиакомпания Соединённых Штатов Америки.

    Топ-3 перевозчика с наиболее частой отменой рейсов

    Найдем также авиакомпании с максимальным количеством отмененных рейсов. Исходный код алгоритма так же можете найти по ссылке.
    В результате мы получили:

    Где: MQ – American Eagle Airlines, Американская авиакомпания; OO - SkyWest Airlines, региональная авиакомпания США; AA - American Airlines, американская авиакомпания.

    Топ-3 авиакомпании с минимальной средней задержкой отправления

    В следующем примере мы найдем авиакомпании с минимальной средней задержкой отправления таким же способом, как мы использовали в третьем примере. Результат ниже – это то, что мы получили при помощи алгоритма, созданного на базе следующего исходного кода:
    Вот вывод данных работы алгоритма:

    Где: HA - Hawaiian Airlines, американская авиакомпания; US - US Airways, американская авиакомпания;
    F9 - Frontier Airlines, магистральная авиакомпания США.

    Таким образом, для получения результатов, представленных выше, мы должны запустить следующую команду в терминале в Spark каталоге:

    Заключение

    Конечно, Scylla не поможет вам избежать задержек рейсов во время путешествий, но она способна сократить время ожидания, размер кластера и другие метрики. В авиабизнесе быстрая обработка данных – ключ к обработке и анализу событий в реальном времени. Благодаря своей высокой производительности Scylla позволяет сделать это в рекордно короткие сроки. Более того, разработчикам не нужно менять структуру данных для уже существующих проектов в Cassandra. Они могут использовать их напрямую в Scylla.

    Задать вопрос