Spark: дата-майнинг до 30-ти раз быстрее Hadoop

В Калифорнийском университете в Беркли разработали фреймворк Spark для распределённых вычислений в кластерах. На некоторых задачах он превосходит Hadoop в 10-30 раз, сохраняя при этом масштабируемость и надёжность MapReduce.

Увеличение производительности до 30х возможно на специфических задачах, в которых идёт постоянное обращение к одному и тому же набору данных. Например, это интерактивный дата-майнинг и итерационные алгоритмы, которые активно используются, например, в системах машинного обучения. Собственно, для этих двух задач проект и создавался. Но Spark превосходит Hadoop не только в системах машинного обучения, но и в традиционных приложениях по обработке данных.

Главная инновация в Spark - введение новой абстракции Resilient distributed datasets (RDD): это набор read-only объектов, распределённых по машинам кластера. Они восстанавливаются в случае сбоя диска и могут постоянно находиться в памяти. Например, при RDD размером до 39 Гб гарантируется скорость доступа менее 1 с.

Для упрощения программирования Spark интегрирован в синтаксис языка программирования Scala 2.8.1, так что можно легко манипулировать RDD словно локальными объектами. Кроме того, Spark запускается из-под менеджера Mesos, так что его можно использовать параллельно с Hadoop или другими фреймворками.

Вот некоторые примеры.

 


Поиск текста

val file = spark.textFile("hdfs://...")
val errors = file.filter(line => line.contains("ERROR"))
// Count all the errors
errors.count()
// Count errors mentioning MySQL
errors.filter(line => line.contains("MySQL")).count()
// Fetch the MySQL errors as an array of strings
errors.filter(line => line.contains("MySQL")).collect()

Здесь происходит поиск сообщений об ошибке в логах. Красные фрагменты - процедуры замыкания Scala, которые автоматически передаются в кластер, синим обозначены операторы Spark.


Поиск текста в памяти

Spark может кэшировать RDD в памяти для ускорения работы и повторного обращения к этим наборам данных. Для предыдущего примера мы можем просто добавить одну строчку, которая будет кэшировать в памяти только сообщения об ошибках.

errors.cache()

После этого обработка такого типа данных значительно ускоряется.


Подсчёт количества слов

В данном примере показано несколько действий, чтобы создать набор данных с парами (String, Int) и записать его в файл.

val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)
counts.saveAsText("hdfs://...")


Логистическая регрессия

Это статистическая модель, используемая для предсказания вероятности возникновения некоторого события путём подгонки данных к логистической кривой. Данный итерационный алгоритм широко используется в системах машинного обучения, но может найти применение и в других приложениях, например, в распознавании спама. Этот алгоритм особенно выигрывает от кэширования входящих данных в оперативной памяти.

val points = spark.textFile(...).map(parsePoint).cache()
var w = Vector.random(D)
// current separating plane
for (i <- 1 to ITERATIONS) {
 val gradient = points.map(p =>
  (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
 ).reduce(_ + _)
 w -= gradient
}
println("Final separating plane: " + w)

На диаграмме показано сравнение производительности Spark и Hadoop при расчёте модели логистической регрессии на наборе данных 30 Гб в 80-ядерном кластере.

Анатолий АЛИЗАР

Версия для печатиВерсия для печати

Рубрики: 

  • 1
  • 2
  • 3
  • 4
  • 5
Всего голосов: 0
Заметили ошибку? Выделите ее мышкой и нажмите Ctrl+Enter!