git pull on the big-data-engineering repository.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import sys
import numpy as np
from pyspark.sql import SparkSession
np.random.seed(123)
def matvec(N: int):
try:
spark = SparkSession.builder.appName("Vector and Matrix Multiplication").getOrCreate()
sc = spark.sparkContext
M = np.random.randint(5, size = (N, N))
V = np.random.randint(5, size = N)
P = np.matmul(M, V)
# Print the matrices
print(f"Matrix M:\n {M}")
print(f"Vector N:\n {V}")
print(f"Dot Product P:\n {P}")
mspark = sc.parallelize(M)
print(f"Parallelize M inside Spark Context:\n {mspark.take(4)}")
mspark = sc.parallelize(M).zipWithIndex()
print(f"Parallelize M inside Spark Context with Index:\n {mspark.take(4)}")
mspark = (sc
.parallelize(M)
.zipWithIndex()
.map(lambda item: item[0].dot(V))
)
print(f"MapReduce Dot Product:\n {np.array(mspark.take(4))}")
print(f"Serial Dot Product P for comparison purposes:\n {P}")
spark.stop()
except Exception as e:
print(f"Spark failed to start: {e}")
if __name__ == "__main__":
matvec(int(sys.argv[1]))
Run the following command
1
spark-submit --master="local[*]" .\data-parallel\matrix-vector.py 4
Vector V fits into memory
parallelize into RDD objects on the cluster. mspark and perform a dot product between the element and Vector V.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import sys
import numpy as np
from pyspark.sql import SparkSession
np.random.seed(123)
def getPos(t):
res = []
for idx,x in np.ndenumerate(t[1]):
res.append((idx[0], (t[0], x.item()))) # (row, (column, cell value))
return res
def arrayMul(t):
res = []
vectorVal = None
for x in t[1]:
if type(x) != tuple:
vectorVal = x
for x in t[1]:
if isinstance(x, tuple):
row_index = x[0]
matrix_value = x[1]
res.append((row_index, matrix_value * vectorVal))
return res
def matvec(N: int):
try:
spark = SparkSession.builder.appName("Vector and Matrix Multiplication").getOrCreate()
sc = spark.sparkContext
M = np.random.randint(5, size = (N, N))
V = np.random.randint(5, size = N)
P = np.matmul(M, V)
# Print the matrices
print(f"Matrix M:\n {M}")
print(f"Vector V:\n {V}")
print(f"Dot Product P:\n {P}")
mspark = sc.parallelize(M).zipWithIndex().map(lambda item: (item[1],item[0].tolist()))
print(f"Parallelize M inside Spark Context with Index:")
local_mspark = mspark.collect()
for item in local_mspark:
print(item)
vspark = sc.parallelize(V).zipWithIndex().map(lambda item: (item[1],item[0].tolist()))
print(f"Parallelize V inside Spark Context with Index:")
local_vspark = vspark.collect()
for item in local_vspark:
print(item)
pspark = (mspark
.flatMap(getPos)
.union(vspark)
.groupByKey()
.mapValues(list)
.flatMap(arrayMul)
.reduceByKey(lambda a, b: a + b)
)
print(f"MapReduce Dot Product:")
local_pspark = pspark.collect()
for item in local_pspark:
print(item)
#flatMap(arrayMul)
#print(f"MapReduce Dot Product:\n {np.array(pspark.take(16))}")
print(f"Serial Dot Product P for comparison purposes:\n {P}")
spark.stop()
except Exception as e:
print(f"Spark failed to start: {e}")
if __name__ == "__main__":
matvec(int(sys.argv[1]))
Run the following command
1
spark-submit --master="local[*]" .\data-parallel\matrix-big-vector.py 16
getPos groupBy. list type instead of Spark’s iterables groupBy is a key/value pair:reduceByKey.
1
2
!wget https://files.grouplens.org/datasets/movielens/ml-32m.zip
!unzip ml-32m.zip
1
!ls -lh ml-32m/
1
2
3
ratings = sc.textFile(working_dir + "/ml-32m/ratings.csv")
print(ratings.take(5))
ratings.cache()
1
2
%%time
ratings.count()
CPU times: user 252 ms, sys: 28.2 ms, total: 280 ms
Wall time: 1min 40s
32000205
1
2
%%time
ratings.count()
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205
1
2
%%time
ratings.count()
CPU times: user 147 ms, sys: 19.8 ms, total: 167 ms
Wall time: 58.4 s
32000205