```python linenums=”1” import sys import os sys.path.insert(0, ‘/opt/spark/python’) sys.path.insert(0, ‘/opt/spark/python/lib/py4j-0.10.9.7-src.zip’) os.environ[‘SPARK_HOME’] = ‘/opt/spark’
import pyspark
conf = pyspark.SparkConf() conf.setMaster(“spark://spark-master:7077”) conf.set(“spark.driver.memory”,”1g”)
sc = pyspark.SparkContext(conf=conf) ```
```python linenums=”1” import numpy as np np.random.seed(123) N = 4 M = np.random.randint(5, size = (N, N)) V = np.random.randint(5, size = N)
P = np.matmul(M, V)
print(f”Matrix M:\n {M}”) print(f”Vector N:\n {V}”) print(f”Dot Product P:\n {P}”) ```
Matrix M:
[[2 4 2 1]
[3 2 3 1]
[1 0 1 1]
[0 0 1 3]]
Vector N:
[4 0 0 4]
Dot Product P:
[12 16 8 12]
Vector V fits into memory
parallelize into RDD objects on the cluster.```python linenums=”1” mspark = sc.parallelize(M) print(mspark.take(4))
1
2
3
4
5
6
7
8
9
10
11
12
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">[array([2, 4, 2, 1]), array([3, 2, 3, 1]), array([1, 0, 1, 1]), array([0, 0, 1, 3])]
</code></pre>
</details>
- This will not work well, as we no longer have an indicator of row order. We need to provide some additional information as we parallelize our local data.
- [zipWithIndex](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.zipWithIndex.html)
```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex()
print(mspark.take(4))
[(array([2, 4, 2, 1]), 0), (array([3, 2, 3, 1]), 1), (array([1, 0, 1, 1]), 2), (array([0, 0, 1, 3]), 3)]
```python linenums=”1” def vectorDot(mRow): return mRow.dot(V)
mspark = sc.parallelize(M).zipWithIndex().
map(lambda item: (vectorDot(item[0])))
print(f”MapReduce Dot Product:\n {np.array(mspark.take(4))}”) print(f”Dot Product P:\n {P}”) ```
MapReduce Dot Product:
[12 16 8 12]
Dot Product P:
[12 16 8 12]
```python linenums=”1” mspark = sc.parallelize(M).zipWithIndex() vspark = sc.parallelize(V).zipWithIndex() print(mspark.take(4)) print(vspark.take(4))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">[(array([2, 4, 2, 1]), 0),
(array([3, 2, 3, 1]), 1),
(array([1, 0, 1, 1]), 2),
(array([0, 0, 1, 3]), 3)]
[(np.int64(4), 0), (np.int64(0), 1), (np.int64(0), 2), (np.int64(4), 3)]
</code></pre>
</details>
- We want to use indices as keys
```python linenums="1"
mspark = sc.parallelize(M).zipWithIndex().map(lambda item: (item[1],item[0]))
vspark = sc.parallelize(V).zipWithIndex().map(lambda item: (item[1],item[0]))
print(mspark.take(4))
print(vspark.take(4))
[(0, array([2, 4, 2, 1])),
(1, array([3, 2, 3, 1])),
(2, array([1, 0, 1, 1])),
(3, array([0, 0, 1, 3]))]
[(0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
```python linenums=”1” pspark = mspark.union(vspark) print(pspark.take(8))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">[(0, array([2, 4, 2, 1])),
(1, array([3, 2, 3, 1])),
(2, array([1, 0, 1, 1])),
(3, array([0, 0, 1, 3])),
(0, np.int64(4)),
(1, np.int64(0)),
(2, np.int64(0)),
(3, np.int64(4))]
</code></pre>
</details>
- Each of these numpy array represents one row
of the matrix (see `Initial data`).
- The summation is made on the product of each element of the same column
on each row.
- Because V no longer fits into memory, it can be reasoned taht each of these numpy arrays also does not fit in memory.
- Need to extract column identifier from each numpy array and turn them into individual values, similar to the vector's values.
- This needs to be done prior to `union`
```python linenums="1"
def getPos(t):
res = []
for idx,x in np.ndenumerate(t[1]):
res.append((idx[0], (t[0], x))) # (row, (column, cell value))
return res
pspark = mspark.flatMap(getPos).union(vspark)
print(pspark.take(20))
[(0, (0, np.int64(2))), (1, (0, np.int64(4))), (2, (0, np.int64(2))), (3, (0, np.int64(1))),
(0, (1, np.int64(3))), (1, (1, np.int64(2))), (2, (1, np.int64(3))), (3, (1, np.int64(1))),
(0, (2, np.int64(1))), (1, (2, np.int64(0))), (2, (2, np.int64(1))), (3, (2, np.int64(1))),
(0, (3, np.int64(0))), (1, (3, np.int64(0))), (2, (3, np.int64(1))), (3, (3, np.int64(3))),
(0, np.int64(4)), (1, np.int64(0)), (2, np.int64(0)), (3, np.int64(4))]
```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey() print(pspark.take(16))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">[(0, <pyspark.resultiterable.ResultIterable object at 0xffff885f62c0>),
(1, <pyspark.resultiterable.ResultIterable object at 0xffff885f4d30>),
(2, <pyspark.resultiterable.ResultIterable object at 0xffff885f6a70>),
(3, <pyspark.resultiterable.ResultIterable object at 0xffff885f6ec0>)]
</code></pre>
</details>
- Convert data into `list` type instead of Spark's `iterables`
```python linenums="1"
def getPos(t):
res = []
for idx,x in np.ndenumerate(t[1]):
res.append((idx[0], (t[0], x)))
return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list)
print(pspark.take(16))
[(0, [(1, np.int64(3)), (2, np.int64(1)), (3, np.int64(0)), (0, np.int64(2)), np.int64(4)]),
(1, [(3, np.int64(0)), np.int64(0), (0, np.int64(4)), (1, np.int64(2)), (2, np.int64(0))]),
(2, [(0, np.int64(2)), (1, np.int64(3)), (3, np.int64(1)), (2, np.int64(1)), np.int64(0)]),
(3, [(1, np.int64(1)), np.int64(4), (3, np.int64(3)), (0, np.int64(1)), (2, np.int64(1))])]
pspark frame is a key/value pair: ```python linenums=”1” def getPos(t): res = [] for idx,x in np.ndenumerate(t[1]): res.append((idx[0], (t[0], x))) return res
def arrayMul(t): res = [] vectorVal = 0 for x in t[1]: if type(x) != tuple: vectorVal = x for x in t[1]: if type(x) == tuple: res.append((x[0], x[1] * vectorVal)) return res
pspark = mspark.flatMap(getPos).union(vspark).groupByKey().mapValues(list).flatMap(arrayMul) print(pspark.take(16))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">[(3, np.int64(0)), (2, np.int64(4)), (0, np.int64(8)), (1, np.int64(12)),
(1, np.int64(0)), (2, np.int64(0)), (0, np.int64(0)), (3, np.int64(0)),
(3, np.int64(0)), (2, np.int64(0)), (1, np.int64(0)), (0, np.int64(0)),
(0, np.int64(4)), (2, np.int64(4)), (3, np.int64(12)), (1, np.int64(4))]
</code></pre>
</details>
- Final steps:
- groupByKey: Bring the values with same row index together.
- mapValues: Convert value type `iterables` into `list`.
- mapValues: Apply sum on the value list.
```python linenums="1"
dotspark = pspark.groupByKey().mapValues(list).mapValues(sum).map(lambda x: x[1])
print(f"MapReduce Dot Product:\n {np.array(dotspark.take(4))}")
print(f"Dot Product P:\n {P}")
MapReduce Dot Product:
[12 16 8 12]
Dot Product P:
[12 16 8 12]
```python linenums=”1” !wget https://files.grouplens.org/datasets/movielens/ml-32m.zip !unzip ml-32m.zip
1
2
3
4
5
- How big of a data set are we talking about?
```python linenums="1"
!ls -lh ml-32m/
```python linenums=”1” ratings = sc.textFile(working_dir + “/ml-32m/ratings.csv”) print(ratings.take(5))
ratings.cache()
1
2
3
4
5
6
- First count, not yet loaded into memory
```python linenums="1"
%%time
ratings.count()
CPU times: user 252 ms, sys: 28.2 ms, total: 280 ms
Wall time: 1min 40s
32000205
```python linenums=”1” %%time ratings.count()
1
2
3
4
5
6
7
8
9
10
11
12
<details class="details details--default" data-variant="default"><summary>Output</summary>
<pre><code class="language-output">CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
</code></pre>
</details>
- Third count, preloaded
```python linenums="1"
%%time
ratings.count()
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205