Data Parallel Computing with Spark

Data Parallel Computing with Spark

1. Initial preparation

```python linenums=”1” import sys import os sys.path.insert(0, ‘/opt/spark/python’) sys.path.insert(0, ‘/opt/spark/python/lib/py4j-0.10.9.7-src.zip’) os.environ[‘SPARK_HOME’] = ‘/opt/spark’

import pyspark

conf = pyspark.SparkConf() conf.setMaster(“spark://spark-master:7077”) conf.set(“spark.driver.memory”,”1g”)

sc = pyspark.SparkContext(conf=conf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
---

## 2. Matrix-vector multiplication

### 2.1. Initial data

```python linenums="1"
import numpy as np
np.random.seed(123)
N = 4
M = np.random.randint(5, size = (N, N))
V = np.random.randint(5, size = N)

P = np.matmul(M, V)

# Print the matrices
print(f"Matrix M:\n {M}")
print(f"Vector N:\n {V}")
print(f"Dot Product P:\n {P}")

???note “Output” output Matrix M: [[2 4 2 1] [3 2 3 1] [1 0 1 1] [0 0 1 3]] Vector N: [4 0 0 4] Dot Product P: [12 16 8 12]

!!!warning “Initial assumption” Vector V fits into memory

2.2. Moving data from notebook’s memory into Spark cluster

```python linenums=”1” mspark = sc.parallelize(M) print(mspark.take(4))

1
2
3
4
5
6
7
8
9
10
11
12
???note "Output"
    ```output
    [array([2, 4, 2, 1]), array([3, 2, 3, 1]), array([1, 0, 1, 1]), array([0, 0, 1, 3])]
    ```

- This will not work well, as we no longer have an indicator of row order. We need to provide some additional information as we parallelize our local data. 
    - [zipWithIndex](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.zipWithIndex.html)

```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex()
print(mspark.take(4))

???note “Output”

1
2
3
```output
[(array([2, 4, 2, 1]), 0), (array([3, 2, 3, 1]), 1), (array([1, 0, 1, 1]), 2), (array([0, 0, 1, 3]), 3)]    
```

2.3. Direct multiplication

```python linenums=”1” def vectorDot(mRow): return mRow.dot(V)

mspark = sc.parallelize(M).zipWithIndex().
map(lambda item: (vectorDot(item[0])))

print(f”MapReduce Dot Product:\n {np.array(mspark.take(4))}”) print(f”Dot Product P:\n {P}”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
???note "Output"

    ```output
    MapReduce Dot Product:
    [12 16 8 12]
    Dot Product P:
    [12 16 8 12]
    ```

!!!warning "Remove assumption"
    - Vector V no longer fits into memory

### 2.4. Data processing for multiplication

- Both matrix M and vector V are loaded onto Spark

```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex()
vspark = sc.parallelize(V).zipWithIndex()
print(mspark.take(4))
print(vspark.take(4))

???note “Output”

1
2
3
4
5
6
7
```output
[(array([2, 4, 2, 1]), 0), 
(array([3, 2, 3, 1]), 1), 
(array([1, 0, 1, 1]), 2), 
(array([0, 0, 1, 3]), 3)]
[(np.int64(4), 0), (np.int64(0), 1), (np.int64(0), 2), (np.int64(4), 3)]
```

```python linenums=”1” mspark = sc.parallelize(M).zipWithIndex().map(lambda item: (item[1],item[0])) vspark = sc.parallelize(V).zipWithIndex().map(lambda item: (item[1],item[0])) print(mspark.take(4)) print(vspark.take(4))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
???note "Output"

    ```output
    [(0, array([2, 4, 2, 1])), 
    (1, array([3, 2, 3, 1])), 
    (2, array([1, 0, 1, 1])), 
    (3, array([0, 0, 1, 3]))]
    [(0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
    ```

- Let's turn RDDs of M and V into a single RDD

```python linenums="1"
pspark = mspark.union(vspark)
print(pspark.take(8))

???note “Output”

1
2
3
4
5
6
7
8
9
10
```output
[(0, array([2, 4, 2, 1])), 
(1, array([3, 2, 3, 1])), 
(2, array([1, 0, 1, 1])), 
(3, array([0, 0, 1, 3])), 
(0, np.int64(4)), 
(1, np.int64(0)), 
(2, np.int64(0)), 
(3, np.int64(4))]
```

```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) # (row, (column, cell value)) return res

pspark = mspark.flatMap(getPos).union(vspark) print(pspark.take(20))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
???note "Output"

    ```output
    [(0, (0, np.int64(2))), (1, (0, np.int64(4))), (2, (0, np.int64(2))), (3, (0, np.int64(1))), 
    (0, (1, np.int64(3))), (1, (1, np.int64(2))), (2, (1, np.int64(3))), (3, (1, np.int64(1))), 
    (0, (2, np.int64(1))), (1, (2, np.int64(0))), (2, (2, np.int64(1))), (3, (2, np.int64(1))), 
    (0, (3, np.int64(0))), (1, (3, np.int64(0))), (2, (3, np.int64(1))), (3, (3, np.int64(3))), 
    (0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
    ```

- The keys of the above pairs represent the row identifier. 
- If value is a tuple, first element is the column identifier and second element 
represent the value of the matrix cell corresponding to the specific row and column. 

```python linenums="1"
def getPos(t):
    res = []
    for idx,x in np.ndenumerate(t[1]):
        res.append((idx[0], (t[0], x)))
    return res
    
pspark = mspark.flatMap(getPos).union(vspark).groupByKey()
print(pspark.take(16))

???note “Output”

1
2
3
4
5
6
```output
[(0, <pyspark.resultiterable.ResultIterable object at 0xffff885f62c0>), 
(1, <pyspark.resultiterable.ResultIterable object at 0xffff885f4d30>), 
(2, <pyspark.resultiterable.ResultIterable object at 0xffff885f6a70>), 
(3, <pyspark.resultiterable.ResultIterable object at 0xffff885f6ec0>)]
```

```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) return res

pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list) print(pspark.take(16))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
???note "Output"

    ```output
    [(0, [(1, np.int64(3)), (2, np.int64(1)), (3, np.int64(0)), (0, np.int64(2)), np.int64(4)]), 
    (1, [(3, np.int64(0)), np.int64(0), (0, np.int64(4)), (1, np.int64(2)), (2, np.int64(0))]), 
    (2, [(0, np.int64(2)), (1, np.int64(3)), (3, np.int64(1)), (2, np.int64(1)), np.int64(0)]), 
    (3, [(1, np.int64(1)), np.int64(4), (3, np.int64(3)), (0, np.int64(1)), (2, np.int64(1))])]
    ```

- Each element in this `pspark` frame is a key/value pair:
    - The key is the row index 
    - The value is a 5-element array: 
        - Four elements belong to a row of the matrix M, each of this element is a tuple with the key representing the column index and the value represent the corresponding matrix cell value. 
        - One remaining element belong to the value of the vector V at that row index position. 
- As shown in the previous output, we cannot assume proper order after union, especially for large scale data
- Need to flatten the final results (list of tupples instead of list of lists of tuples)

```python linenums="1"
def getPos(t):
    res = []
    for idx,x in np.ndenumerate(t[1]):
        res.append((idx[0], (t[0], x)))
    return res

def arrayMul(t):
    res = []
    vectorVal = 0
    for x in t[1]:
        if type(x) != tuple:
            vectorVal = x
    for x in t[1]:
        if type(x) == tuple:
            res.append((x[0], x[1] * vectorVal))
    return res
    
pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list).flatMap(arrayMul)
print(pspark.take(16))

???note “Output”

1
2
3
4
5
6
```output
[(3, np.int64(0)), (2, np.int64(4)), (0, np.int64(8)), (1, np.int64(12)), 
(1, np.int64(0)), (2, np.int64(0)), (0, np.int64(0)), (3, np.int64(0)), 
(3, np.int64(0)), (2, np.int64(0)), (1, np.int64(0)), (0, np.int64(0)), 
(0, np.int64(4)), (2, np.int64(4)), (3, np.int64(12)), (1, np.int64(4))]
```

```python linenums=”1” dotspark = pspark.groupByKey().mapValues(list).mapValues(sum).map(lambda x: x[1]) print(f”MapReduce Dot Product:\n {np.array(dotspark.take(4))}”) print(f”Dot Product P:\n {P}”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
???note "Output"

    ```output
    MapReduce Dot Product:
    [12 16 8 12]
    Dot Product P:
    [12 16 8 12]
    ```

---

## 3. Analyzing text data (not using Spark SQL) 

### 3.1. Getting MovieLens data

- Downloading move review data

```python linenums="1"
!wget https://files.grouplens.org/datasets/movielens/ml-32m.zip
!unzip ml-32m.zip

```python linenums=”1” !ls -lh ml-32m/

1
2
3
4
5
6
7
- You can review the [README](https://files.grouplens.org/datasets/movielens/ml-32m-README.html) of this data set. 

```python linenums="1"
ratings = sc.textFile(working_dir + "/ml-32m/ratings.csv")
print(ratings.take(5))   
ratings.cache()

```python linenums=”1” %%time ratings.count()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
???note "Output"

    ```output
    CPU times: user 252 ms, sys: 28.2 ms, total: 280 ms
    Wall time: 1min 40s
    32000205
    ```

- Second count, preloaded

```python linenums="1"
%%time
ratings.count()

???note “Output”

1
2
3
4
5
```output
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
```

python linenums="1" %%time ratings.count()

???note “Output”

1
2
3
4
5
```output
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
```

???question “Hands-on 1” - What are the average ratings over the years of each movie?
- Note: don’t forget to filter!

???question “Hands-on 2” - What are the average ratings over the years of each movie? - Display the results with movie titles instead of movie IDs - Assumption: movies.csv does not fit in memory

???question “Hands-on 3” - Identify movies that can be considered cult-classic?

???question “Hands-on 4” - What are the average ratings over the years of each genre - Assumption: movies.csv does not fit in memory