Hello :) Today is Day 335!
A quick summary of today:
- spark lectures + labs by Zach Wilson
Spark by Zach Wilson 1 2
Here are my notes from the two lectures
Then I decided to do the homework. Doing the homework did not really require watching the lectures but I wanted to watch the lectures first anyway ~
There are probably better ways to do this than locally, but for now it’s fine. And also I was able to confim some of the results with another student
In each code block below, the 1st line is the asked task:
(it’s 5 different scripts as I think that’s how they will need to be submitted, but can easily put it in 1 if needed)
# 1. Disabled automatic broadcast join
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Jupyter") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
# 2. Explicitly broadcast JOINs medals and maps
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Jupyter") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
medals = spark.read.option('header', 'true').csv('/home/iceberg/data/medals.csv')
medals_matches_players = spark.read.option('header', 'true').csv('/home/iceberg/data/medals_matches_players.csv')
maps = spark.read.option('header', 'true').csv('/home/iceberg/data/maps.csv')
matches = spark.read.option('header', 'true').csv('/home/iceberg/data/matches.csv')
medals_joined = medals_matches_players.join(F.broadcast(medals), medals_matches_players.medal_id == medals.medal_id)
maps_joined = matches.join(F.broadcast(maps), maps.mapid == matches.mapid)
# 3. Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Jupyter") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
matches = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/matches.csv")
matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
.write.mode("append") \
.partitionBy("completion_date") \
.bucketBy(16, "match_id") \
# .saveAsTable("matches_bucketed")
match_details = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/match_details.csv")
medals_matches_players = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/medals_matches_players.csv")
match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("match_details_bucketed")
medals_matches_players.select("match_id", "medal_id", "count") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("medals_matches_players_bucketed")
join1 = matches.alias("m").join(
match_details.alias("md"),
on=F.col("m.match_id") == F.col("md.match_id"),
how="inner"
)
final_join = join1.join(
medals_matches_players.alias("mmp"),
on=F.col("m.match_id") == F.col("mmp.match_id"),
how="inner"
)
final_join = final_join.select(
F.col("m.match_id").alias("matches_match_id"),
F.col("m.mapid"),
F.col("m.is_team_game"),
F.col("m.playlist_id"),
F.col("m.completion_date"),
F.col("md.player_gamertag"),
F.col("md.player_total_kills"),
F.col("md.player_total_deaths"),
F.col("mmp.medal_id"),
F.col("mmp.count")
)
# 4. Aggregate the joined data frame to figure out questions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Jupyter") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
matches = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/matches.csv")
matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
.write.mode("append") \
.partitionBy("completion_date") \
.bucketBy(16, "match_id") \
# .saveAsTable("matches_bucketed")
match_details = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/match_details.csv")
medals_matches_players = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/medals_matches_players.csv")
match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("match_details_bucketed")
medals_matches_players.select("match_id", "medal_id", "count") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("medals_matches_players_bucketed")
join1 = matches.alias("m").join(
match_details.alias("md"),
on=F.col("m.match_id") == F.col("md.match_id"),
how="inner"
)
final_join = join1.join(
medals_matches_players.alias("mmp"),
on=F.col("m.match_id") == F.col("mmp.match_id"),
how="inner"
)
final_join = final_join.select(
F.col("m.match_id").alias("matches_match_id"),
F.col("m.mapid"),
F.col("m.is_team_game"),
F.col("m.playlist_id"),
F.col("m.completion_date"),
F.col("md.player_gamertag"),
F.col("md.player_total_kills"),
F.col("md.player_total_deaths"),
F.col("mmp.medal_id"),
F.col("mmp.count")
)
# Which player averages the most kills per game?
player_kills = final_join.groupBy("md.player_gamertag").agg(
F.avg("md.player_total_kills").alias("avg_kills_per_game")
)
player_kills_sorted = player_kills.orderBy(F.desc("avg_kills_per_game"))
player_kills_sorted.show(1)
# Which playlist gets played the most?
playlist_counts = final_join.groupBy("m.playlist_id").agg(
F.count("matches_match_id").alias("matches_played")
)
playlist_sorted = playlist_counts.orderBy(F.desc("matches_played"))
playlist_sorted.show(1)
# Which map gets played the most?
map_counts = final_join.groupBy("m.mapid").agg(
F.count("matches_match_id").alias("matches_played")
)
map_sorted = map_counts.orderBy(F.desc("matches_played"))
map_sorted.show(1)
# 5. Try different .sortWithinPartitions to see which has the smallest data size
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Jupyter") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.getOrCreate()
matches = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/matches.csv")
matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
.write.mode("append") \
.partitionBy("completion_date") \
.bucketBy(16, "match_id") \
# .saveAsTable("matches_bucketed")
match_details = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/match_details.csv")
medals_matches_players = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("../data/medals_matches_players.csv")
match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("match_details_bucketed")
medals_matches_players.select("match_id", "medal_id", "count") \
.write.mode("append") \
.bucketBy(16, "match_id") \
# .saveAsTable("medals_matches_players_bucketed")
join1 = matches.alias("m").join(
match_details.alias("md"),
on=F.col("m.match_id") == F.col("md.match_id"),
how="inner"
)
final_join = join1.join(
medals_matches_players.alias("mmp"),
on=F.col("m.match_id") == F.col("mmp.match_id"),
how="inner"
)
final_join = final_join.select(
F.col("m.match_id").alias("matches_match_id"),
F.col("m.mapid"),
F.col("m.is_team_game"),
F.col("m.playlist_id"),
F.col("m.completion_date"),
F.col("md.player_gamertag"),
F.col("md.player_total_kills"),
F.col("md.player_total_deaths"),
F.col("mmp.medal_id"),
F.col("mmp.count")
)
mapid_playlist_sort_df = final_join.sortWithinPartitions(F.col("m.mapid"), F.col("m.playlist_id"))
matchid_mapid_playlist_sort_df = final_join.sortWithinPartitions(F.col("matches_match_id"), F.col("m.mapid"), F.col("m.playlist_id"))
mapid_playlist_matchid_sort_df = final_join.sortWithinPartitions(F.col("m.mapid"), F.col("m.playlist_id"), F.col("matches_match_id"))
playlist_mapid_matchid_sort_df = final_join.sortWithinPartitions(F.col("m.playlist_id"), F.col("m.mapid"), F.col("matches_match_id"))
final_join.write.mode("overwrite").parquet("final_join")
mapid_playlist_sort_df.write.mode("overwrite").parquet("mapid_playlist_sort")
matchid_mapid_playlist_sort_df.write.mode("overwrite").parquet("matchid_mapid_playlist_sort")
mapid_playlist_matchid_sort_df.write.mode("overwrite").parquet("mapid_playlist_matchid_sort")
playlist_mapid_matchid_sort_df.write.mode("overwrite").parquet("playlist_mapid_matchid_sort")
sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
def get_file_size_and_count(path):
files = fs.globStatus(sc._jvm.org.apache.hadoop.fs.Path(path + "/*"))
file_sizes = sum(file.getLen() for file in files)
return file_sizes
final_join_size = get_file_size_and_count("final_join")
mapid_playlist_size = get_file_size_and_count("mapid_playlist_sort")
matchid_mapid_playlist_size = get_file_size_and_count("matchid_mapid_playlist_sort")
mapid_playlist_matchid_size = get_file_size_and_count("mapid_playlist_matchid_sort")
playlist_mapid_matchid_sort_df = get_file_size_and_count("playlist_mapid_matchid_sort")
print(f"final_join: {final_join_size:,} bytes") # 8,087,514 bytes
print(f"mapid_playlist_sort: {mapid_playlist_size:,} bytes") # 15,453,444 bytes
print(f"matchid_mapid_playlist_sort: {matchid_mapid_playlist_size:,} bytes") # 8,103,554 bytes
print(f"mapid_playlist_matchid_sort: {mapid_playlist_matchid_size:,} bytes") # 7,837,038 bytes
print(f"playlist_mapid_matchid_sort: {playlist_mapid_matchid_sort_df:,} bytes") # 7,847,491 bytes
I hung out a bunch on Zach Wilson’s community discord today as well ~ trying to discover/fix some bugs people (including me as well) had.
That is all for today!
See you tomorrow :)