Aleksandar Prokopec, Heather Miller
If you’re using Scala 2.13+ and want to use Scala’s parallel collections, you’ll have to import a separate module, as described here.
Amidst the shift in recent years by processor manufacturers from single to multicore architectures, academia and industry alike have conceded that Popular Parallel Programming remains a formidable challenge.
Parallel collections were included in the Scala standard library in an effort to facilitate parallel programming by sparing users from low-level parallelization details, meanwhile providing them with a familiar and simple high-level abstraction. The hope was, and still is, that implicit parallelism behind a collections abstraction will bring reliable parallel execution one step closer to the workflow of mainstream developers.
The idea is simple– collections are a well-understood and frequently-used programming abstraction. And given their regularity, they’re able to be efficiently parallelized, transparently. By allowing a user to “swap out” sequential collections for ones that are operated on in parallel, Scala’s parallel collections take a large step forward in enabling parallelism to be easily brought into more code.
Take the following, sequential example, where we perform a monadic operation on some large collection:
val list = (1 to 10000).toList list.map(_ + 42)
To perform the same operation in parallel, one must simply invoke the
method on the sequential collection,
list. After that, one can use a
parallel collection in the same way one would normally use a sequential
collection. The above example can be parallelized by simply doing the
list.par.map(_ + 42)
The design of Scala’s parallel collections library is inspired by and deeply integrated with Scala’s (sequential) collections library (introduced in 2.8). It provides a parallel counterpart to a number of important data structures from Scala’s (sequential) collection library, including:
collection.concurrent.TrieMaps are new in 2.10)
In addition to a common architecture, Scala’s parallel collections library additionally shares extensibility with the sequential collections library. That is, like normal sequential collections, users can integrate their own collection types and automatically inherit all the predefined (parallel) operations available on the other parallel collections in the standard library.
To attempt to illustrate the generality and utility of parallel collections, we provide a handful of simple example usages, all of which are transparently executed in parallel.
Note: Some of the following examples operate on small collections, which isn’t recommended. They’re provided as examples for illustrative purposes only. As a general heuristic, speed-ups tend to be noticeable when the size of the collection is large, typically several thousand elements. (For more information on the relationship between the size of a parallel collection and performance, please see the appropriate subsection of the performance section of this guide.)
Using a parallel
map to transform a collection of
String to all-uppercase:
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin) scala> lastNames.map(_.toUpperCase) res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)
fold on a
scala> val parArray = (1 to 10000).toArray.par parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ... scala> parArray.fold(0)(_ + _) res0: Int = 50005000
Using a parallel
filter to select the last names that come alphabetically
after the letter “I”.
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin) scala> lastNames.filter(_.head >= 'J') res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)
Creating a Parallel Collection
Parallel collections are meant to be used in exactly the same way as sequential collections– the only noteworthy difference is how to obtain a parallel collection.
Generally, one has two choices for creating a parallel collection:
First, by using the
new keyword and a proper import statement:
import scala.collection.parallel.immutable.ParVector val pv = new ParVector[Int]
Second, by converting from a sequential collection:
val pv = Vector(1,2,3,4,5,6,7,8,9).par
What’s important to expand upon here are these conversion methods– sequential
collections can be converted to parallel collections by invoking the
par method, and likewise, parallel collections can
be converted to sequential collections by invoking the parallel collection’s
Of Note: Collections that are inherently sequential (in the sense that the
elements must be accessed one after the other), like lists, queues, and
streams, are converted to their parallel counterparts by copying the elements
into a similar parallel collection. An example is
List– it’s converted into
a standard immutable parallel sequence, which is a
ParVector. Of course, the
copying required for these collection types introduces an overhead not
incurred by any other collection types, like
For more information on conversions on parallel collections, see the conversions and concrete parallel collection classes sections of this guide.
While the parallel collections abstraction feels very much the same as normal sequential collections, it’s important to note that its semantics differs, especially in regard to side-effects and non-associative operations.
In order to see how this is the case, first, we visualize how operations are performed in parallel. Conceptually, Scala’s parallel collections framework parallelizes an operation on a parallel collection by recursively “splitting” a given collection, applying an operation on each partition of the collection in parallel, and re-“combining” all the results that were completed in parallel.
These concurrent, and “out-of-order” semantics of parallel collections lead to the following two implications:
- Side-effecting operations can lead to non-determinism
- Non-associative operations lead to non-determinism
Given the concurrent execution semantics of the parallel collections
framework, operations performed on a collection which cause side-effects
should generally be avoided, in order to maintain determinism. A simple
example is by using an accessor method, like
foreach to increment a
declared outside the closure which is passed to
scala> var sum = 0 sum: Int = 0 scala> val list = (1 to 1000).toList.par list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,… scala> list.foreach(sum += _); sum res01: Int = 467766 scala> var sum = 0 sum: Int = 0 scala> list.foreach(sum += _); sum res02: Int = 457073 scala> var sum = 0 sum: Int = 0 scala> list.foreach(sum += _); sum res03: Int = 468520
Here, we can see that each time
sum is reinitialized to 0, and
called again on
sum holds a different value. The source of this
non-determinism is a data race– concurrent reads/writes to the same mutable
In the above example, it’s possible for two threads to read the same value
sum, to spend some time doing some operation on that value of
then to attempt to write a new value to
sum, potentially resulting in an
overwrite (and thus, loss) of a valuable result, as illustrated below:
ThreadA: read value in sum, sum = 0 value in sum: 0 ThreadB: read value in sum, sum = 0 value in sum: 0 ThreadA: increment sum by 760, write sum = 760 value in sum: 760 ThreadB: increment sum by 12, write sum = 12 value in sum: 12
The above example illustrates a scenario where two threads read the same
0, before one or the other can sum
0 with an element from their
partition of the parallel collection. In this case,
sums it with its element,
0+760, and in the case of
0+12. After computing their respective sums, they each write
their computed value in
ThreadB, it writes
first, only for the value in
sum to be overwritten shortly after by
ThreadB, in effect completely overwriting (and thus losing) the value
Given this “out-of-order” semantics, also must be careful to perform only
associative operations in order to avoid non-determinism. That is, given a
pcoll, one should be sure that when invoking a
higher-order function on
pcoll, such as
pcoll.reduce(func), the order in
func is applied to the elements of
pcoll can be arbitrary. A simple,
but obvious example is a non-associative operation such as subtraction:
scala> val list = (1 to 1000).toList.par list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,… scala> list.reduce(_-_) res01: Int = -228888 scala> list.reduce(_-_) res02: Int = -61000 scala> list.reduce(_-_) res03: Int = -331818
In the above example, we take a
reduce, and pass to
_-_, which simply takes two unnamed elements, and subtracts the first
from the second. Due to the fact that the parallel collections framework spawns
threads which, in effect, independently perform
reduce(_-_) on different
sections of the collection, the result of two runs of
reduce(_-_) on the
same collection will not be the same.
Note: Often, it is thought that, like non-associative operations, non-commutative operations passed to a higher-order function on a parallel collection likewise result in non-deterministic behavior. This is not the case, a simple example is string concatenation– an associative, but non- commutative operation:
scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz) scala> val alphabet = strings.reduce(_++_) alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz
The “out of order” semantics of parallel collections only means that the operation will be executed out of order (in a temporal sense. That is, non-sequentially), it does not mean that the result will be re-“combined” out of order (in a spatial sense). On the contrary, results will generally always be reassembled in order– that is, a parallel collection broken into partitions A, B, C, in that order, will be reassembled once again in the order A, B, C. Not some other arbitrary order like B, C, A.
For more on how parallel collections split and combine operations on different parallel collection types, see the Architecture section of this guide.