parallel collections

Обзор

Авторы оригинала: Aleksandar Prokopec, Heather Miller

Перевод Анастасии Маркиной

Мотивация

Пока производители процессоров в последние годы дружно переходили от одноядерных к многоядерным архитектурам, научное и производственное сообщества не менее дружно признали, что многопоточное программирование по-прежнему трудно сделать популярным.

Чтобы упростить написание многопоточных программ, в стандартную библиотеку Scala были включены параллельные коллекции, которые скрыли от пользователей низкоуровневые подробности параллелизации, дав им привычную высокоуровневую абстракцию. Надежда была (и остается) на то, что скрытая под уровнем абстракции параллельность позволит на шаг приблизиться к ситуации, когда среднестатистический разработчик будет повседневно использовать в работе надежно исполняемый параллельный код.

Идея проста: коллекции – хорошо понятная и часто используемая программистами абстракция. Упорядоченность коллекций позволяет эффективно и прозрачно (для пользователя) обрабатывать их параллельно. Позволив пользователю “подменить” последовательные коллекции на те, что обрабатываются параллельно, решение Scala делает большой шаг вперед к охвату большего количества кода возможностями параллельной обработки.

Рассмотрим следующий пример, где мы исполняем монадическую операцию на некоторой большой последовательной коллекции:

val list = (1 to 10000).toList
list.map(_ + 42)

Чтобы выполнить ту же самую операцию параллельно, требуется просто вызвать метод par на последовательной коллекции list. После этого можно работать с параллельной коллекцией так же, как и с последовательной. То есть, пример выше примет вид:

list.par.map(_ + 42)

Библиотека параллельных коллекций Scala тесно связана с “последовательной” библиотекой коллекций Scala (представлена в версии 2.8), во многом потому, что последняя служила вдохновением к ее дизайну. Он предоставляет параллельную альтернативу ряду важных структур данных из библиотеки (последовательных) коллекций Scala, в том числе:

  • ParArray
  • ParVector
  • mutable.ParHashMap
  • mutable.ParHashSet
  • immutable.ParHashMap
  • immutable.ParHashSet
  • ParRange
  • ParTrieMap (collection.concurrent.TrieMap впервые в версии 2.10)

Библиотека параллельных коллекций Scala расширяема также как и последовательные коллекции, представленные в стандартной библиотеке. Другими словами, как и в случае с обычными последовательными коллекциями, пользователи могут внедрять свои собственные типы коллекций, автоматически наследуя все предопределенные (параллельные) операции, доступные для других параллельных коллекций в стандартной библиотеке.

Несколько примеров

Попробуем изобразить всеобщность и полезность представленных коллекций на ряде простых примеров, для каждого из которых характерно прозрачно-параллельное выполнение.

Примечание: Некоторые из последующих примеров оперируют небольшими коллекциями, для которых такой подход не рекомендуется. Они должны рассматриваться только как иллюстрация. Эвристически, ускорение становится заметным, когда размер коллекции дорастает до нескольких тысяч элементов. (Более подробно о взаимосвязи между размером коллекции и производительностью, смотрите соответствующий подраздел раздела, посвященного производительности в данном руководстве.)

map

Используем параллельную map для преобразования набора строк String в верхний регистр:

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.map(_.toUpperCase)
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)

fold

Суммируем через fold на ParArray:

scala> val parArray = (1 to 10000).toArray.par    
parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ...

scala> parArray.fold(0)(_ + _)
res0: Int = 50005000

filter

Используем параллельный filter для отбора фамилий, которые начинаются с буквы “J” или стоящей дальше в алфавите:

scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)

scala> lastNames.filter(_.head >= 'J')
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)

Создание параллельной коллекции

Параллельные коллекции предназначались для того, чтобы быть использованными таким же образом, как и последовательные; единственное значимое отличие – в методе получения параллельной коллекции.

В общем виде, есть два варианта создания параллельной коллекции:

Первый, с использованием ключевого слова new и подходящего оператора import:

import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]

Второй, преобразованием последовательной коллекции:

val pv = Vector(1,2,3,4,5,6,7,8,9).par

Разовьем эту важную мысль – последовательные коллекции можно конвертировать в параллельные вызовом метода par на последовательных, и, соответственно, параллельные коллекции также можно конвертировать в последовательные вызовом метода seq на параллельных.

На заметку: Коллекции, являющиеся последовательными в силу наследования (в том смысле, что доступ к их элементам требуется получать по порядку, один элемент за другим), такие, как списки, очереди и потоки (streams), преобразовываются в свои параллельные аналоги копированием элементов в соответствующие параллельные коллекции. Например, список List конвертируется в стандартную неизменяемую параллельную последовательность, то есть в ParVector. Естественно, что копирование, которое для этого требуется, вносит дополнительный расход производительности, которого не требуют другие типы коллекций, такие как Array, Vector, HashMap и т.д.

Больше информации о конвертировании можно найти в разделах преобразования и конкретные классы параллельных коллекций этого руководства.

Семантика

В то время, как абстракция параллельной коллекции заставляет думать о ней так, как если бы речь шла о нормальной последовательной коллекции, важно помнить, что семантика различается, особенно в том, что касается побочных эффектов и неассоциативных операций.

Для того, чтобы увидеть, что происходит, для начала представим, как именно операции выполняются параллельно. В концепции, когда фреймворк параллельных коллекций Scala распараллеливает операцию на соответствующей коллекции, он рекурсивно “разбивает” данную коллекцию, параллельно выполняет операцию на каждом разделе коллекции, а затем “комбинирует” все полученные результаты.

Эти многопоточные, “неупорядоченные” семантики параллельных коллекций приводят к следующим скрытым следствиям:

  1. Операции, имеющие побочные эффекты, могут нарушать детерминизм
  2. Неассоциативные операции могут нарушать детерминизм

Операции, имеющие побочные эффекты.

Вследствие использования фреймворком параллельных коллекций семантики многопоточного выполнения, в большинстве случаев для соблюдения детерминизма требуется избегать выполнения на коллекциях операций, которые выполняют побочные действия. В качестве простого примера попробуем использовать метод доступа foreach для увеличения значения переменной var, объявленной вне замыкания, которое было передано foreach.

scala> var sum = 0
sum: Int = 0

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.foreach(sum += _); sum
res01: Int = 467766

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res02: Int = 457073    

scala> var sum = 0
sum: Int = 0

scala> list.foreach(sum += _); sum
res03: Int = 468520    

В примере видно, что несмотря на то, что каждый раз sum инициализируется в 0, при каждом новом вызове foreach на предложенном list, sum получает различное значение. Источником недетерминизма является так называемая гонка– параллельное чтение/запись одной и той же изменяемой переменной.

В примере выше возможен случай, когда два потока прочитают одно и то же значение переменной sum, потратят некоторое время на выполнение операции над этим значением sum, а потом попытаются записать новое значение в sum, что может привести к перезаписи (а следовательно, к потере) значимого результата, как показано ниже:

Поток A: читает значение sum, sum = 0                значение sum: 0
Поток B: читает значение sum, sum = 0                значение sum: 0
Поток A: увеличивает sum на 760, пишет sum = 760     значение sum: 760
Поток B: увеличивает sum на 12, пишет sum = 12       значение sum: 12

Приведенный выше пример демонстрирует сценарий, где два потока успевают прочитать одно и то же значение, 0, прежде чем один или другой из них успеет прибавить к этому 0 элемент из своего куска параллельной коллекции. В этом случае Поток A читает 0 и прибавляет к нему свой элемент, 0+760, в то время, как Поток B прибавляет 0 к своему элементу, 0+12. После того, как они вычислили свои суммы, каждый из них записывает свое значение в sum. Получилось так, что Поток A успевает записать значение первым, только для того, чтобы это помещенное в sum значение было практически сразу же перезаписано потоком B, тем самым полностью перезаписав (и потеряв) значение 760.

Неассоциативные операции

Из-за “неупорядоченной” семантики, нелишней осторожностью становится требование выполнять только ассоциативные операции во избежание недетерминированности. То есть, если мы имеем параллельную коллекцию pcoll, нужно убедиться, что при вызове на pcoll функции более высокого уровня, такой как pcoll.reduce(func), порядок, в котором func применяется к элементам pcoll, может быть произвольным. Простым и очевидным примером неассоциативной операции является вычитание:

scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…

scala> list.reduce(_-_)
res01: Int = -228888

scala> list.reduce(_-_)
res02: Int = -61000

scala> list.reduce(_-_)
res03: Int = -331818

В примере выше, мы берем ParVector[Int], вызываем функцию reduce, и передаем ей _-_, которая просто берет два неименованных элемента, и вычитает один из другого. Вследствие того, что фреймворк параллельных коллекций порождает потоки и независимо выполняет reduce(_-_) на разных частях коллекции, результат двух запусков reduce(_-_) на одной и той же коллекции не будет одним и тем же.

Примечание: Часто возникает мысль, что так же, как и в случае с неассоциативными, некоммутативные операции, переданные в более высокую уровнем функцию на параллельной коллекции, приводят к недетеминированному поведению. Это неверно, простой пример – конкатенация строк – ассоциативная, но некоммутативная операция:

scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par
strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz) 

scala> val alphabet = strings.reduce(_++_)
alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz

“Неупорядоченная” семантика параллельных коллекций означает только то, что операции будут выполнены не по порядку (во временном отношении. То есть, не последовательно), она не означает, что результат будет “перемешан” относительно изначального порядка (в пространственном отношении). Напротив, результат будет практически всегда пересобран по-порядку– то есть, параллельная коллекция, разбитая на части в порядке A, B, C, будет снова объединена в том же порядке A, B, C, а не в каком-то произвольном, например, B, C, A.

Если требуется больше информации о том, как разделяются и комбинируются операции на различных типах коллекций, посетите раздел Архитектура этого руководства.