Parallel Collections

Создание пользовательской параллельной коллекции

Language

Параллельная коллекция без компоновщиков

Определить параллельную коллекцию без определения ее компоновщика возможно, так же, как возможно определить собственную последовательную коллекцию без определения ее строителей (builders). Вследствие отсутствия компоновщика получится, что методы трансформаций (т.е. map, flatMap, collect, filter, …) по умолчанию будут возвращать ближайшую по иерархии стандартную коллекцию. Например, диапазоны строителей не имеют, и поэтому создание отображения элементов диапазона– map – создает вектор.

В следующем примере определим параллельную коллекцию-строку. Так как строки по сути являются неизменяемыми последовательностями, сделаем их класс наследником immutable.ParSeq[Char]:

class ParString(val str: String)
extends immutable.ParSeq[Char] {

Затем определим методы, которые есть в любой неизменяемой последовательности:

  def apply(i: Int) = str.charAt(i)

  def length = str.length

Кроме того, мы должны решить, что будем возвращать в качестве последовательного аналога нашей параллельной коллекции. Пусть это будет класс WrappedString:

  def seq = new collection.immutable.WrappedString(str)

И наконец, требуется задать разделитель для наших параллельных строк. Назовем его ParStringSplitter и сделаем его потомком разделителя последовательностей, то есть типа SeqSplitter[Char]:

  def splitter = new ParStringSplitter(str, 0, str.length)

  class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
  extends SeqSplitter[Char] {

    final def hasNext = i < ntl

    final def next = {
      val r = s.charAt(i)
      i += 1
      r
    }

В примере выше, ntl отображает общую длину строки, i– текущую позицию, и наконец s– саму строку.

Итераторы (или разделители) параллельных коллекций требуют еще несколько методов помимо next и hasNext, характерных для итераторов последовательных коллекций. Для начала, у них есть метод remaining, возвращающий количество элементов, которые данному разделителю еще предстоит перебрать. Затем, метод dup, дублирующий текущий разделитель.

    def remaining = ntl - i

    def dup = new ParStringSplitter(s, i, ntl)

И наконец, методы split и psplit, которые используются для создания разделителей, перебирающих подмножества элементов текущего разделителя. Для метода split действует соглашение, что он возвращает последовательность разделителей, перебирающих непересекающиеся подмножества элементов текущего разделителя, ни одно из которых не является пустым. Если текущий разделитель покрывает один или менее элементов, split возвращает саму последовательность этого разделителя. Метод psplit должен возвращать последовательность разделителей, перебирающих точно такое количество элементов, которое задано значениями размеров, указанных параметром sizes. Если параметр sizes требует отделить меньше элементов, чем покрыто текущим разделителем, то дополнительный разделитель со всеми остальными элементами размещается в конце последовательности. Если в параметре sizes указано больше элементов, чем содержится в текущем разделителе, для каждого размера, на который не хватило элементов, будет добавлен пустой разделитель. Наконец, вызов split или psplit делает текущий разделитель недействительным.

   def split = {
      val rem = remaining
      if (rem >= 2) psplit(rem / 2, rem - rem / 2)
      else Seq(this)
    }

    def psplit(sizes: Int*): Seq[ParStringSplitter] = {
      val splitted = new ArrayBuffer[ParStringSplitter]
      for (sz <- sizes) {
        val next = (i + sz) min ntl
        splitted += new ParStringSplitter(s, i, next)
        i = next
      }
      if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
      splitted
    }
  }
}

Выше приведена реализация метода split посредством вызова psplit, что часто наиболее оправдано в случае параллельных коллекций. Написать реализацию разделителя для параллельных ассоциативных массивов, множеств или итерируемых объектов чаще всего проще, так как они не требуют реализации метода psplit.

Итак, мы получили класс параллельных строк. Единственным недостатком является то, что вызов методов трансформации, таких, как filter, произведет параллельный вектор вместо параллельной строки, что в ряде случаев может оказаться не самым оптимальным решением, так как воссоздание строки из вектора после фильтрации может оказаться затратным.

Параллельные коллекции с компоновщиками

Допустим, мы хотим применить filter к символам параллельной строки, например, чтобы избавиться от запятых. Как отмечено выше, вызов filter вернет параллельный вектор, в то время как мы хотим получить строку (так как некоторые интерфейсы используемого API могут требовать последовательную строку).

Чтобы избежать этого, для параллельной строки требуется написать компоновщик. На этот раз мы унаследуем трейт ParSeqLike, чтобы конкретизировать значение, возвращаемое методом filter– а именно ParString вместо ParSeq[Char]. Третий параметр-тип трейта ParSeqLike указывает тип последовательного аналога параллельной коллекции (в этом отличие от последовательных трейтов вида *Like, имеющих только два параметра).

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString]

Все методы остаются такими же, как в предыдущем примере, только дополнительно добавляется защищенный метод newCombiner, который используется при выполнении метода filter.

  protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

Следующим шагом определяем класс ParStringCombiner. Компоновщики являются подтипами строителей, в которых появляется дополнительный метод combine, принимающий другой компоновщик как аргумент и возвращающий новый компоновщик, который содержит элементы и текущего и принятого компоновщика. И текущий компоновщик, и компоновщик-аргумент становятся недействительными после вызова combine. Если передать аргументом сам текущий компоновщик, метод combine просто вернет его же как результат. Предполагается, что метод должен быть эффективным, то есть в худшем случае требовать для выполнения логарифмического времени по отношению к количеству элементов, так как в ходе параллельного вычисления он вызывается большое количество раз.

Наш ParStringCombiner будет содержать последовательность строителей строк. Он будет реализовывать += путем добавления элемента к последнему строителю строки в последовательности, и combine конкатенацией списков строителей строк текущего компоновщика и компоновщика-аргумента. Метод result, вызываемый в конце параллельного вычисления, произведет параллельную строку соединив все строители строк вместе. Таким образом, элементы копируются только один раз в конце, а не каждый раз, когда вызывается метод combine. В идеале, мы должны подумать о том, чтобы еще и копирование проводить параллельно (именно так и происходит в случае параллельных массивов), но без погружения в детали внутреннего представления строк это лучшее, чего мы можем добиться– остается смириться с этим последовательным узким местом.

private class ParStringCombiner extends Combiner[Char, ParString] {
  var sz = 0
  val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
  var lastc = chunks.last

  def size: Int = sz

  def +=(elem: Char): this.type = {
    lastc += elem
    sz += 1
    this
  }

  def clear = {
    chunks.clear
    chunks += new StringBuilder
    lastc = chunks.last
    sz = 0
  }

  def result: ParString = {
    val rsb = new StringBuilder
    for (sb <- chunks) rsb.append(sb)
    new ParString(rsb.toString)
  }

  def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
    val that = other.asInstanceOf[ParStringCombiner]
    sz += that.sz
    chunks ++= that.chunks
    lastc = chunks.last
    this
  }
}

Как реализовать собственный компоновщик?

Тут нет стандартного рецепта, – все зависит от имеющейся структуры данных, и обычно требует изобретательности со стороны того, кто пишет реализацию. Тем не менее, можно выделить несколько подходов, которые обычно применяются:

  1. Конкатенация и объединение. Некоторые структуры данных позволяют реализовать эти операции эффективно (обычно с логарифмической сложностью), и если требуемая коллекция представлена такой структурой данных, ее компоновщик может быть самой такой коллекцией. Особенно хорошо этот подход работает для подвешенных деревьев (finger trees), веревок (ropes) и различных видов куч.

  2. Двухфазное выполнение. Подход, применяемый в случае параллельных массивов и параллельных хэш-таблиц; он предполагает, что элементы могут быть эффективно рассортированы по готовым для конкатенации блокам, из которых результирующая структура данных может быть построена параллельно. В первую фазу блоки заполняются независимо различными процессорами, и в конце просто соединяются конкатенацией. Во вторую фазу происходит выделение памяти для целевой структуры данных, и после этого различные процессоры заполняют различные ее части, используя элементы непересекающихся блоков. Следует принять меры для того, чтобы различные процессоры никогда не изменяли одну и ту же часть структуры данных, иначе не избежать трудноуловимых, связанных с многопоточностью ошибок. Такой подход легко применить к последовательностям с произвольным доступом, как было показано в предыдущем разделе.

  3. Многопоточная структура данных. Так как последние два подхода, в сущности, не требуют использования примитивных механизмов синхронизации, предполагается, что структура будет строиться несколькими потоками так, что два различных процессора никогда не будут изменять одну и ту же область памяти. Существует большое количество многопоточных структур данных, которые могут безопасно изменяться несколькими процессорами одновременно, среди таких можно упомянуть многопоточные списки с пропусками (skip lists), многопоточные хэш-таблицы, split-ordered списки и многопоточные АВЛ-деревья. При этом требуется следить, чтобы у выбранной многопоточной структуры был горизонтально масштабируемый метод вставки. У многопоточных параллельных коллекций компоновщик может быть представлен самой коллекцией, и единственный его экземпляр обычно используется всеми процессорами, занятыми в выполнении параллельной операции.

Интеграция с фреймворком коллекций

Наш класс ParString оказался не вполне завершен: несмотря на то, что мы реализовали собственный компоновщик, который будут использовать такие методы, как filter, partition, takeWhile или span, большинство методов трансформации требуют скрытый параметр-доказательство CanBuildFrom (подробное объяснение можно посмотреть в “Scala collections guide” (прим. перев. скорее /overviews/core/architecture-of-scala-collections.html)). Чтобы обеспечить его доступность и тем самым полностью интегрировать наш класс ParString с фреймворком коллекций, требуется примешать дополнительный трейт GenericParTemplate и определить объект-компаньон для ParString.

class ParString(val str: String)
extends immutable.ParSeq[Char]
   with GenericParTemplate[Char, ParString]
   with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {

  def companion = ParString

Внутрь объекта-компаньона помещаем скрытый параметр-доказательство CanBuildFrom.

object ParString {
  implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
    new CanCombinerFrom[ParString, Char, ParString] {
      def apply(from: ParString) = newCombiner
      def apply() = newCombiner
    }

  def newBuilder: Combiner[Char, ParString] = newCombiner

  def newCombiner: Combiner[Char, ParString] = new ParStringCombiner

  def apply(elems: Char*): ParString = {
	val cb = newCombiner
	cb ++= elems
	cb.result
  }
}

Дальнейшие настройки– многопоточные и другие коллекции

Процесс реализации многопоточной коллекции (в отличие от параллельных, многопоточные коллекции могут подобно collection.concurrent.TrieMap изменяться одновременно несколькими потоками) не всегда прост и очевиден. При этом особенно нуждаются в тщательном обдумывании компоновщики. Компоновщики большинства параллельных коллекций, которые были рассмотрены до этого момента, используют двухфазное выполнение. На первом этапе элементы добавляются различными процессорами к своим компоновщикам и последние объединяются вместе. На втором шаге, когда становятся доступными все элементы, строится результирующая коллекция.

Другим подходом является построение результирующей коллекции как структуры элементов компоновщика. Для этого коллекция должна быть потокозащищенной– компоновщик должен позволять выполнить многопоточную вставку элемента. В этом случае один компоновщик может использоваться всеми процессорами.

Если требуется сделать многопоточную коллекцию параллельной, в ее компоновщике нужно перегрузить метод canBeShared так, чтобы он возвращал true. Этим мы заставим проверять, что при выполнении параллельной операции создается только один компоновщик. Далее, метод += должен быть потокозащищенным. И наконец, метод combine по-прежнему должен возвращать текущий компоновщик в случае, если он совпадает с аргументом, а в противном случае вполне может выбросить исключение.

Чтобы добиться лучшей балансировки нагрузки, разделители делятся на более мелкие разделители. По умолчанию решение о том, что дальнейшее разделение не требуется, принимается на основе информации, возвращенной методом remaining. Для некоторых коллекций вызов метода remaining может быть затратным, и решение о разделении лучше принять другими способами. В этом случае нужно перегрузить метод shouldSplitFurther разделителя.

В реализации по умолчанию разделитель делится, если число оставшихся элементов больше, чем размер коллекции деленный на взятый восемь раз уровень параллелизма.

def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
    remaining > thresholdFromSize(coll.size, parallelismLevel)

Как вариант, разделитель может иметь счетчик количества проведенных над ним разделений и реализовывать метод shouldSplitFurther, возвращая true, если количество разделений больше, чем 3 + log(parallelismLevel). Это и позволяет избежать вызова метода remaining.

Более того, если для некоторой коллекции вызов remaining затратен (то есть требует обработки большого числа элементов), то метод isRemainingCheap в разделителях следует перегрузить, так, чтобы он возвращал false.

Наконец, если реализовать метод remaining в разделителях весьма затруднительно, можно возвращать false в перегруженном методе isStrictSplitterCollection соответствующей коллекции. Над такими коллекциями не получится выполнить ряд методов, в частности таких, которые требуют точности разделителей (последнее предполагает как раз, что метод remaining возвращает правильное значение). Но, что важно, это не относится к методам, используемым для обработки for-включений (for-comprehensions).