SharedArrays.jl

Unlike distributed DArray from DistributedArrays.jl, a SharedArray object is stored in full on the control process, but it is shared across all workers on the same node, with a significant cache on each worker. SharedArrays package is part of Julia’s Standard Library (comes with the language).

There are downsides to SharedArray when compared to DistributedArrays.jl:

  1. The ability to write into the same array elements from multiple processes creates the potential for a race condition and indeterminate outcome with a poorly written code!
  2. You are limited to a set of workers on the same node – does SharedArrays predate DistributedArrays?
  3. You will have very skewed (non-uniform across processes) memory usage.

Let’s start with serial Julia (julia command) and initialize a 1D shared array:

using Distributed
addprocs(4)
using SharedArrays    # important to run after adding workers
a = SharedArray{Float64}(30);
a[:] .= 1.0           # assign from the control process
@fetchfrom 2 sum(a)   # correct (30.0)
@fetchfrom 3 sum(a)   # correct (30.0)
sum(a)                # correct (30.0)
@sync @spawnat 2 a .= 2.0   # can assign from any worker!
@fetchfrom 2 sum(a)         # correct (90.0)
@fetchfrom 3 sum(a)         # correct (60.0)
sum(a)

You can use a function to initialize an array, however, pay attention to the result:

b = SharedArray{Int64}((1000), init = x -> x .= 0);    # use a function to initialize `b`
length(b)

c = SharedArray{Int64}((1000), init = x -> x .+= 1)   # each worker updates the entire array in-place!
sum(c)
ImportantKey idea

Each worker runs this function!

Here is another demo with a problem – let’s fill each element with its corresponding myid() value:

@everywhere println(myid())     # let's use these IDs in the next function
d = SharedArray{Int64}((20), init = x -> x .= myid())   # indeterminate outcome! each time a new result

Each worker updates every element, but the order in which they do this varies from one run to another, producing indeterminate outcome.

Avoiding a race condition 1: use localindices

With a SharedArray, there is an implicit partitioning for processing on workers (although the array itself is stored on the control process):

@everywhere using SharedArrays   # otherwise `localindices` won't be found on workers
d = SharedArray{Int64}((20), init = x -> x .= myid())   # indeterminate outcome! each time a new result
for w in workers()
    println(@fetchfrom w localindices(d))   # this block is assigned for processing on worker `i`
end

What we really want is for each worker to fill only its assigned block (parallel init, same result every time):

d = SharedArray{Int64}((20), init = x -> x[localindices(x)] .= myid())

Avoiding a race condition 2: use the parallel for loop

Let’s initialize a 2D SharedArray:

n = 100;
a = SharedArray{Float64}(n,n);
@distributed for i in 1:n   # parallel for loop split across all workers
    for j in 1:n
        a[i,j] = myid()       # ID of the worker that initialized this element
    end
end

for w in workers()
    println(@fetchfrom w localindices(a))   # weird: shows 1D indices for 2D array
end

for i in a[1:n,1]         # first element in each row
    println(i)
end
a[1:10,1:10]                # on the control process
@fetchfrom 2 a[1:10,1:10]   # on worker 2