Параллельная коллекция без компоновщиков
Определить параллельную коллекцию без определения ее компоновщика возможно, так же, как возможно определить собственную последовательную коллекцию без определения ее строителей (builders
). Вследствие отсутствия компоновщика получится, что методы трансформаций (т.е. map
, flatMap
, collect
, filter
, …) по умолчанию будут возвращать ближайшую по иерархии стандартную коллекцию. Например, диапазоны строителей не имеют, и поэтому создание отображения элементов диапазона– map
– создает вектор.
В следующем примере определим параллельную коллекцию-строку. Так как строки по сути являются неизменяемыми последовательностями, сделаем их класс наследником immutable.ParSeq[Char]
:
class ParString(val str: String)
extends immutable.ParSeq[Char] {
Затем определим методы, которые есть в любой неизменяемой последовательности:
def apply(i: Int) = str.charAt(i)
def length = str.length
Кроме того, мы должны решить, что будем возвращать в качестве последовательного аналога нашей параллельной коллекции. Пусть это будет класс WrappedString
:
def seq = new collection.immutable.WrappedString(str)
И наконец, требуется задать разделитель для наших параллельных строк. Назовем его ParStringSplitter
и сделаем его потомком разделителя последовательностей, то есть типа SeqSplitter[Char]
:
def splitter = new ParStringSplitter(str, 0, str.length)
class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
extends SeqSplitter[Char] {
final def hasNext = i < ntl
final def next = {
val r = s.charAt(i)
i += 1
r
}
В примере выше, ntl
отображает общую длину строки, i
– текущую позицию, и наконец s
– саму строку.
Итераторы (или разделители) параллельных коллекций требуют еще несколько методов помимо next
и hasNext
, характерных для итераторов последовательных коллекций. Для начала, у них есть метод remaining
, возвращающий количество элементов, которые данному разделителю еще предстоит перебрать. Затем, метод dup
, дублирующий текущий разделитель.
def remaining = ntl - i
def dup = new ParStringSplitter(s, i, ntl)
И наконец, методы split
и psplit
, которые используются для создания разделителей, перебирающих подмножества элементов текущего разделителя. Для метода split
действует соглашение, что он возвращает последовательность разделителей, перебирающих непересекающиеся подмножества элементов текущего разделителя, ни одно из которых не является пустым. Если текущий разделитель покрывает один или менее элементов, split
возвращает саму последовательность этого разделителя. Метод psplit
должен возвращать последовательность разделителей, перебирающих точно такое количество элементов, которое задано значениями размеров, указанных параметром sizes
. Если параметр sizes
требует отделить меньше элементов, чем покрыто текущим разделителем, то дополнительный разделитель со всеми остальными элементами размещается в конце последовательности. Если в параметре sizes
указано больше элементов, чем содержится в текущем разделителе, для каждого размера, на который не хватило элементов, будет добавлен пустой разделитель. Наконец, вызов split
или psplit
делает текущий разделитель недействительным.
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem / 2)
else Seq(this)
}
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
val splitted = new ArrayBuffer[ParStringSplitter]
for (sz <- sizes) {
val next = (i + sz) min ntl
splitted += new ParStringSplitter(s, i, next)
i = next
}
if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
splitted
}
}
}
Выше приведена реализация метода split
посредством вызова psplit
, что часто наиболее оправдано в случае параллельных коллекций. Написать реализацию разделителя для параллельных ассоциативных массивов, множеств или итерируемых объектов чаще всего проще, так как они не требуют реализации метода psplit
.
Итак, мы получили класс параллельных строк. Единственным недостатком является то, что вызов методов трансформации, таких, как filter
, произведет параллельный вектор вместо параллельной строки, что в ряде случаев может оказаться не самым оптимальным решением, так как воссоздание строки из вектора после фильтрации может оказаться затратным.
Параллельные коллекции с компоновщиками
Допустим, мы хотим применить filter
к символам параллельной строки, например, чтобы избавиться от запятых. Как отмечено выше, вызов filter
вернет параллельный вектор, в то время как мы хотим получить строку (так как некоторые интерфейсы используемого API могут требовать последовательную строку).
Чтобы избежать этого, для параллельной строки требуется написать компоновщик. На этот раз мы унаследуем трейт ParSeqLike
, чтобы конкретизировать значение, возвращаемое методом filter
– а именно ParString
вместо ParSeq[Char]
. Третий параметр-тип трейта ParSeqLike
указывает тип последовательного аналога параллельной коллекции (в этом отличие от последовательных трейтов вида *Like
, имеющих только два параметра).
class ParString(val str: String)
extends immutable.ParSeq[Char]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString]
Все методы остаются такими же, как в предыдущем примере, только дополнительно добавляется защищенный метод newCombiner
, который используется при выполнении метода filter
.
protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
Следующим шагом определяем класс ParStringCombiner
. Компоновщики являются подтипами строителей, в которых появляется дополнительный метод combine
, принимающий другой компоновщик как аргумент и возвращающий новый компоновщик, который содержит элементы и текущего и принятого компоновщика. И текущий компоновщик, и компоновщик-аргумент становятся недействительными после вызова combine
. Если передать аргументом сам текущий компоновщик, метод combine
просто вернет его же как результат. Предполагается, что метод должен быть эффективным, то есть в худшем случае требовать для выполнения логарифмического времени по отношению к количеству элементов, так как в ходе параллельного вычисления он вызывается большое количество раз.
Наш ParStringCombiner
будет содержать последовательность строителей строк. Он будет реализовывать +=
путем добавления элемента к последнему строителю строки в последовательности, и combine
конкатенацией списков строителей строк текущего компоновщика и компоновщика-аргумента. Метод result
, вызываемый в конце параллельного вычисления, произведет параллельную строку соединив все строители строк вместе. Таким образом, элементы копируются только один раз в конце, а не каждый раз, когда вызывается метод combine
. В идеале, мы должны подумать о том, чтобы еще и копирование проводить параллельно (именно так и происходит в случае параллельных массивов), но без погружения в детали внутреннего представления строк это лучшее, чего мы можем добиться– остается смириться с этим последовательным узким местом.
private class ParStringCombiner extends Combiner[Char, ParString] {
var sz = 0
val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
var lastc = chunks.last
def size: Int = sz
def +=(elem: Char): this.type = {
lastc += elem
sz += 1
this
}
def clear = {
chunks.clear
chunks += new StringBuilder
lastc = chunks.last
sz = 0
}
def result: ParString = {
val rsb = new StringBuilder
for (sb <- chunks) rsb.append(sb)
new ParString(rsb.toString)
}
def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
val that = other.asInstanceOf[ParStringCombiner]
sz += that.sz
chunks ++= that.chunks
lastc = chunks.last
this
}
}
Как реализовать собственный компоновщик?
Тут нет стандартного рецепта, – все зависит от имеющейся структуры данных, и обычно требует изобретательности со стороны того, кто пишет реализацию. Тем не менее, можно выделить несколько подходов, которые обычно применяются:
-
Конкатенация и объединение. Некоторые структуры данных позволяют реализовать эти операции эффективно (обычно с логарифмической сложностью), и если требуемая коллекция представлена такой структурой данных, ее компоновщик может быть самой такой коллекцией. Особенно хорошо этот подход работает для подвешенных деревьев (finger trees), веревок (ropes) и различных видов куч.
-
Двухфазное выполнение. Подход, применяемый в случае параллельных массивов и параллельных хэш-таблиц; он предполагает, что элементы могут быть эффективно рассортированы по готовым для конкатенации блокам, из которых результирующая структура данных может быть построена параллельно. В первую фазу блоки заполняются независимо различными процессорами, и в конце просто соединяются конкатенацией. Во вторую фазу происходит выделение памяти для целевой структуры данных, и после этого различные процессоры заполняют различные ее части, используя элементы непересекающихся блоков. Следует принять меры для того, чтобы различные процессоры никогда не изменяли одну и ту же часть структуры данных, иначе не избежать трудноуловимых, связанных с многопоточностью ошибок. Такой подход легко применить к последовательностям с произвольным доступом, как было показано в предыдущем разделе.
-
Многопоточная структура данных. Так как последние два подхода, в сущности, не требуют использования примитивных механизмов синхронизации, предполагается, что структура будет строиться несколькими потоками так, что два различных процессора никогда не будут изменять одну и ту же область памяти. Существует большое количество многопоточных структур данных, которые могут безопасно изменяться несколькими процессорами одновременно, среди таких можно упомянуть многопоточные списки с пропусками (skip lists), многопоточные хэш-таблицы,
split-ordered
списки и многопоточные АВЛ-деревья. При этом требуется следить, чтобы у выбранной многопоточной структуры был горизонтально масштабируемый метод вставки. У многопоточных параллельных коллекций компоновщик может быть представлен самой коллекцией, и единственный его экземпляр обычно используется всеми процессорами, занятыми в выполнении параллельной операции.
Интеграция с фреймворком коллекций
Наш класс ParString
оказался не вполне завершен: несмотря на то, что мы реализовали собственный компоновщик, который будут использовать такие методы, как filter
, partition
, takeWhile
или span
, большинство методов трансформации требуют скрытый параметр-доказательство CanBuildFrom
(подробное объяснение можно посмотреть в “Scala collections guide” (прим. перев. скорее /overviews/core/architecture-of-scala-collections.html)). Чтобы обеспечить его доступность и тем самым полностью интегрировать наш класс ParString
с фреймворком коллекций, требуется примешать дополнительный трейт GenericParTemplate
и определить объект-компаньон для ParString
.
class ParString(val str: String)
extends immutable.ParSeq[Char]
with GenericParTemplate[Char, ParString]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {
def companion = ParString
Внутрь объекта-компаньона помещаем скрытый параметр-доказательство CanBuildFrom
.
object ParString {
implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
new CanCombinerFrom[ParString, Char, ParString] {
def apply(from: ParString) = newCombiner
def apply() = newCombiner
}
def newBuilder: Combiner[Char, ParString] = newCombiner
def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
def apply(elems: Char*): ParString = {
val cb = newCombiner
cb ++= elems
cb.result
}
}
Дальнейшие настройки– многопоточные и другие коллекции
Процесс реализации многопоточной коллекции (в отличие от параллельных, многопоточные коллекции могут подобно collection.concurrent.TrieMap
изменяться одновременно несколькими потоками) не всегда прост и очевиден. При этом особенно нуждаются в тщательном обдумывании компоновщики. Компоновщики большинства параллельных коллекций, которые были рассмотрены до этого момента, используют двухфазное выполнение. На первом этапе элементы добавляются различными процессорами к своим компоновщикам и последние объединяются вместе. На втором шаге, когда становятся доступными все элементы, строится результирующая коллекция.
Другим подходом является построение результирующей коллекции как структуры элементов компоновщика. Для этого коллекция должна быть потокозащищенной– компоновщик должен позволять выполнить многопоточную вставку элемента. В этом случае один компоновщик может использоваться всеми процессорами.
Если требуется сделать многопоточную коллекцию параллельной, в ее компоновщике нужно перегрузить метод canBeShared
так, чтобы он возвращал true
. Этим мы заставим проверять, что при выполнении параллельной операции создается только один компоновщик. Далее, метод +=
должен быть потокозащищенным. И наконец, метод combine
по-прежнему должен возвращать текущий компоновщик в случае, если он совпадает с аргументом, а в противном случае вполне может выбросить исключение.
Чтобы добиться лучшей балансировки нагрузки, разделители делятся на более мелкие разделители. По умолчанию решение о том, что дальнейшее разделение не требуется, принимается на основе информации, возвращенной методом remaining
. Для некоторых коллекций вызов метода remaining
может быть затратным, и решение о разделении лучше принять другими способами. В этом случае нужно перегрузить метод shouldSplitFurther
разделителя.
В реализации по умолчанию разделитель делится, если число оставшихся элементов больше, чем размер коллекции деленный на взятый восемь раз уровень параллелизма.
def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
remaining > thresholdFromSize(coll.size, parallelismLevel)
Как вариант, разделитель может иметь счетчик количества проведенных над ним разделений и реализовывать метод shouldSplitFurther
, возвращая true
, если количество разделений больше, чем 3 + log(parallelismLevel)
. Это и позволяет избежать вызова метода remaining
.
Более того, если для некоторой коллекции вызов remaining
затратен (то есть требует обработки большого числа элементов), то метод isRemainingCheap
в разделителях следует перегрузить, так, чтобы он возвращал false
.
Наконец, если реализовать метод remaining
в разделителях весьма затруднительно, можно возвращать false
в перегруженном методе isStrictSplitterCollection
соответствующей коллекции. Над такими коллекциями не получится выполнить ряд методов, в частности таких, которые требуют точности разделителей (последнее предполагает как раз, что метод remaining
возвращает правильное значение). Но, что важно, это не относится к методам, используемым для обработки for-включений (for-comprehensions).