-Поиск по дневнику

Поиск сообщений в rss_rss_hh_new

 -Подписка по e-mail

 

 -Статистика

Статистика LiveInternet.ru: показано количество хитов и посетителей
Создан: 17.03.2011
Записей:
Комментариев:
Написано: 51


Запуск регулярных задач на кластере или как подружить Apache Spark и Oozie

Четверг, 28 Сентября 2017 г. 22:45 + в цитатник
entony сегодня в 22:45 Разработка

Запуск регулярных задач на кластере или как подружить Apache Spark и Oozie


    Давно уже витала в воздухе необходимость реализовать запуск регулярных Spark задач через Oozie, но всё руки не доходили и вот наконец свершилось. В этой статье хочу описать весь процесс, возможно она упростит Вам жизнь.


    Содержание


    • Задача
    • Оборудование и установленное ПО
    • Написание Spark задачи
    • Написание workflow.xml
    • Написание coordinator.xml
    • Размещение проекта на hdfs
    • Запуск регулярного выполнения
    • Заключение

    Задача


    Мы имеем следующую структуру на hdfs:


    hdfs://hadoop/project-MD2/data
    hdfs://hadoop/project-MD2/jobs
    hdfs://hadoop/project-MD2/status

    В директория data ежедневно поступают данные и раскладываются по директориям в соответствие с датой. Например, данные за 31.12.2017 запишутся по следующему пути: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz.


    Формат входных данных

    • Разделитель строк: “\n”
    • Разделитель столбцов: “;”
    • Способ сжатия: gzip
    • Количество столбцов: 5 (device_id; lag_A0; lag_A1; flow_1; flow_2)
    • Заголовок в первой строке отсутствует
    • Данные за предыдущие сутки гарантированно записывается в соответствующую директория в интервал времени с 00:00 до 03:00 следующих суток.

    В директории jobs располагаются задачи, которые имеют непосредственное отношение к проекту. Нашу задачу мы также будем размещать в этом каталоге.
    В директорию status должна сохраняться статистика по количеству пустых полей за каждый день в формате json. Например, для данных за 31.12.2017 должен будет появиться файл hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json


    Примет json файла:

    {
       "device_id_count_empty" : 0, 
       "lag_A0_count_empty" : 10, 
       "lag_A1_count_empty" : 0, 
       "flow_1_count_empty" : 37, 
       "flow_2_count_empty" : 100
    }

    Оборудование и установленное ПО


    В нашем распоряжение есть кластер из 10 машин, каждая из которых имеет 8-и ядерный процессор и оперативную память в размере 64 Гб. Общий объём жёстких дисков на всех машинах 100 Тб. Для запуска задач на кластере отведена очередь PROJECTS.


    Установленное ПО:

    • Apache Hadoop 2.7.3 (Hortonworks)
    • Apache Spark 2.0.0
    • Apache Oozie 4.2.0
    • Scala 2.11.11
    • Sbt 1.0.2

    Написание Spark задачи


    Создадим структуру проекта, это можно очень просто сделать в любой среде разработки, поддерживающей scala или из консоли, как показано ниже:


    mkdir -p daily-statistic/project
    echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties
    echo "" > daily-statistic/project/plugins.sbt
    echo "" > daily-statistic/build.sbt
    mkdir -p daily-statistic/src/main/scala

    Замечательно, теперь добавил плагин для сборки, для этого в файле daily-statistic/project/plugins.sbt добавляем следующую строку:


    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

    Добавил описание проекта, зависимости и особенности сборки в файл daily-statistic/build.sbt:


    name := "daily-statistic"
    
    version := "0.1"
    
    scalaVersion := "2.11.11"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
      "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
    )
    
    assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

    Перейдём в директорию daily-statistic и выполнил команду sbt update, для обновления проекта и подтягивания зависимостей из репозитория.
    Создаём Statistic.scala в директории src/main/scala/ru/daily


    Код задачи:

    package ru.daily
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    
    object Statistic extends App {
    
       // инициализация   
       implicit lazy val spark: SparkSession = SparkSession.builder()
         .appName("daily-statistic")
         .getOrCreate()
    
       import spark.implicits._
    
       val workDir = args(0)
       val datePart = args(1)
       val saveDir = args(2)
    
       try {
    
          val date = read(s"$workDir/$datePart/*.csv.gz")
             .select(
                '_c0 as "device_id",
                '_c1 as "lag_A0",
                '_c2 as "lag_A1",
                '_c3 as "flow_1",
                '_c4 as "flow_2"
             )
    
             save(s"$saveDir/$datePart", agg(date))
    
       } finally spark.stop()
    
       // чтение исходных данных   
       def read(path: String)(implicit spark: SparkSession): DataFrame = {
    
          val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip")
    
          spark.read
             .options(inputFormat)
             .csv(path)
       }
    
       // построение агрегата
       def agg(data: DataFrame):DataFrame = data
          .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0))
          .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0))
          .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0))
          .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0))
          .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0))
          .agg(
             sum('device_id_empty) as "device_id_count_empty",
             sum('lag_A0_empty) as "lag_A0_count_empty",
             sum('lag_A1_empty) as "lag_A1_count_empty",
             sum('flow_1_empty) as "flow_1_count_empty",
             sum('flow_2_empty) as "flow_2_count_empty"
          )
    
       // сохранение результата
       def save(path: String, data: DataFrame): Unit = data.write.json(path)
    
    } 

    Собираем проект командой sbt assembly из директории daily-statistic. После успешного завершения сборки в директории daily-statistic/target/scala-2.11 появится пакет с задачей daily-statistic-0.1.jar.


    Написание workflow.xml


    Для запуска задачи через Oozie нужно описать конфигурацию запуска в файле workflow.xml. Ниже привожу пример для нашей задачи:


    
    
       
          
             
                oozie.launcher.mapred.job.queue.name
                ${queue}
             
          
       
    
       
    
       
          
             ${jobTracker}
             ${nameNode}
             yarn-client
             project-md2-daily-statistic
             ru.daily.Statistic
             ${nameNode}${jobDir}/lib/daily-statistic-0.1.jar
             
                --queue ${queue}
                --master yarn-client
                --num-executors 5
                --conf spark.executor.cores=8
                --conf spark.executor.memory=10g
                --conf spark.executor.extraJavaOptions=-XX:+UseG1GC
                --conf spark.yarn.jars=*.jar
                --conf spark.yarn.queue=${queue}
             
             ${nameNode}${dataDir}
             ${datePartition}
             ${nameNode}${saveDir}
           
    
           
           
    
       
    
       
          Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]
       
    
       
    
    

    В блоке global устанавливается очередь, для MapReduce задачи которая будет находить нашу задачи и запускать её.
    В блоке action описывается действие, в нашем случае запуск spark задачи, и что нужно делать при завершении со статусом ОК или ERROR.
    В блоке spark определяется окружение, конфигурируется задача и передаются аргументы. Конфигурация запуска задачи описывается в блоке spark-opts. Параметры можно посмотреть в официальной документации
    Если задача завершается со статусом ERROR, то выполнение переходит в блок kill и выводится кратное сообщение об ошибки.
    Параметры в фигурных скобках, например ${queue}, мы будем определять при запуске.


    Написание coordinator.xml


    Для организации регулярного запуска нам потребуется ещё coordinator.xml. Ниже приведу пример для нашей задачи:


    
        
            
                ${workflowPath}
                
                    
                        datePartition
                        ${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}
                    
                
            
        
    

    Здесь из интересного, параметры frequency, start, end, которые определяют частоту выполнения, дату и время начала выполнения задачи, дату и время окончания выполнения задачи соответственно.
    В блоке workflow указывается путь к директории с файлом workflow.xml, который мы определим позднее при запуске.
    В блоке configuration определяется значение свойства datePartition, которое в данном случае равно текущей дате в формате yyyy/MM/dd минус 1 день.


    Размещение проекта на hdfs

    Как уже было сказано ранее нашу задачу мы будем размещать в директории hdfs://hadoop/project-MD2/jobs:


    hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar
    hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml
    hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml
    hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib

    Здесь в принципе всё понятно без комментариев за исключением директории sharelib. В эту директорию мы положим все библиотеки, которые использовались в процессе создания зашей задачи. В нашем случае это все библиотеки Spark 2.0.0, который мы указывали в зависимостях проекта. Зачем это нужно? Дело в том, что в зависимостях проекта мы указали "provided". Это говорит системе сборки не нужно включать зависимости в проект, они будут предоставлены окружением запуска, но мир не стоит на месте, администраторы кластера могут обновить версию Spark. Наша задача может оказаться чувствительной к этому обновлению, поэтому для запуска будет использоваться набор библиотек из директории sharelib. Как это конфигурируется покажу ниже.


    Запуск регулярного выполнения


    И так сё готово к волнительному моменту запуска. Мы будем запускать задачу через консоль. При запуске нужно задать значения свойствам, которые мы использовали в xml файлах. Вынесем эти свойства в отдельный файл coord.properties:


    # описание окружения
    nameNode=hdfs://hadoop
    jobTracker=hadoop.host.ru:8032
    
    # путь к директории с файлом coordinator.xml
    oozie.coord.application.path=/project-MD2/jobs/daily-statistic
    
    # частота в минутах (раз в 24 часа)
    frequency=1440
    startTime=2017-09-01T07:00Z
    endTime=2099-09-01T07:00Z
    
    # путь к директории с файлом workflow.xml
    workflowPath=/project-MD2/jobs/daily-statistic
    
    # имя пользователя, от которого будет запускаться задача
    mapreduce.job.user.name=username
    user.name=username
    
    # директория с данными и для сохранения результата
    dataDir=/project-MD2/data 
    saveDir=/project-MD2/status
    jobDir=/project-MD2/jobs/daily-statistic 
    
    # очередь для запуска задачи
    queue=PROJECTS
    
    # использовать библиотеке из указанной директории на hdfs вместо системных
    oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib
    oozie.use.system.libpath=false

    Замечательно, тереть всё готово. Запускаем регулярное выполнение командой:


    oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run

    После запуска в консоль выведется id задачи. Используя это id можно посмотреть информацию о статусе выполнения задачи:


    oozie job -info {job_id}

    Остановить задачу:


    oozie job -kill {job_id}

    Если Вы не знаете id задачи, то можно найти его, показав все регулярные задачи для вашего пользователя:


    oozie jobs -jobtype coordinator -filter user={user_name}

    Заключение


    Полечилось немного затянуто, но на мой взгляд лучше подробная инструкция чем квест-поиск по интернету. Надеюсь описанный опыт будет Вам полезен, спасибо за внимание!

    Original source: habrahabr.ru (comments, light).

    https://habrahabr.ru/post/338952/

    Метки:  

     

    Добавить комментарий:
    Текст комментария: смайлики

    Проверка орфографии: (найти ошибки)

    Прикрепить картинку:

     Переводить URL в ссылку
     Подписаться на комментарии
     Подписать картинку