(Day 335) New content from Zach Wilson's free YT bootcamp

Ivan Ivanov · December 1, 2024

Hello :) Today is Day 335!

A quick summary of today:

  • spark lectures + labs by Zach Wilson

Spark by Zach Wilson 1 2

Here are my notes from the two lectures

Spark week3 P1 Spark week3 P2 Spark week3 P3 Spark week3 P4 Spark week3 P5 Spark week3 P6 Spark week3 P7

Then I decided to do the homework. Doing the homework did not really require watching the lectures but I wanted to watch the lectures first anyway ~

There are probably better ways to do this than locally, but for now it’s fine. And also I was able to confim some of the results with another student

In each code block below, the 1st line is the asked task:

(it’s 5 different scripts as I think that’s how they will need to be submitted, but can easily put it in 1 if needed)

# 1. Disabled automatic broadcast join
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .appName("Jupyter") \
                    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
                    .getOrCreate()
# 2. Explicitly broadcast JOINs medals and maps
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
                    .appName("Jupyter") \
                    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
                    .getOrCreate()

medals = spark.read.option('header', 'true').csv('/home/iceberg/data/medals.csv')
medals_matches_players = spark.read.option('header', 'true').csv('/home/iceberg/data/medals_matches_players.csv')
maps = spark.read.option('header', 'true').csv('/home/iceberg/data/maps.csv')
matches = spark.read.option('header', 'true').csv('/home/iceberg/data/matches.csv')

medals_joined = medals_matches_players.join(F.broadcast(medals), medals_matches_players.medal_id == medals.medal_id)
maps_joined = matches.join(F.broadcast(maps), maps.mapid == matches.mapid)
# 3. Bucket join match_details, matches, and medal_matches_players on match_id with 16 buckets
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
                    .appName("Jupyter") \
                    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
                    .getOrCreate()

matches = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("../data/matches.csv")

matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
        .write.mode("append") \
        .partitionBy("completion_date") \
        .bucketBy(16, "match_id") \
        # .saveAsTable("matches_bucketed")

match_details = spark.read.option("header", "true") \
                          .option("inferSchema", "true") \
                          .csv("../data/match_details.csv")

medals_matches_players = spark.read.option("header", "true") \
                                  .option("inferSchema", "true") \
                                  .csv("../data/medals_matches_players.csv")

match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
             .write.mode("append") \
             .bucketBy(16, "match_id") \
        #      .saveAsTable("match_details_bucketed")

medals_matches_players.select("match_id", "medal_id", "count") \
                     .write.mode("append") \
                     .bucketBy(16, "match_id") \
                #      .saveAsTable("medals_matches_players_bucketed")

join1 = matches.alias("m").join(
    match_details.alias("md"), 
    on=F.col("m.match_id") == F.col("md.match_id"), 
    how="inner"
)

final_join = join1.join(
    medals_matches_players.alias("mmp"), 
    on=F.col("m.match_id") == F.col("mmp.match_id"), 
    how="inner"
)

final_join = final_join.select(
    F.col("m.match_id").alias("matches_match_id"),
    F.col("m.mapid"),
    F.col("m.is_team_game"),
    F.col("m.playlist_id"),
    F.col("m.completion_date"),
    F.col("md.player_gamertag"),
    F.col("md.player_total_kills"),
    F.col("md.player_total_deaths"),
    F.col("mmp.medal_id"),
    F.col("mmp.count")
)
# 4. Aggregate the joined data frame to figure out questions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
                    .appName("Jupyter") \
                    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
                    .getOrCreate()

matches = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("../data/matches.csv")

matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
        .write.mode("append") \
        .partitionBy("completion_date") \
        .bucketBy(16, "match_id") \
        # .saveAsTable("matches_bucketed")

match_details = spark.read.option("header", "true") \
                          .option("inferSchema", "true") \
                          .csv("../data/match_details.csv")

medals_matches_players = spark.read.option("header", "true") \
                                  .option("inferSchema", "true") \
                                  .csv("../data/medals_matches_players.csv")

match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
             .write.mode("append") \
             .bucketBy(16, "match_id") \
        #      .saveAsTable("match_details_bucketed")

medals_matches_players.select("match_id", "medal_id", "count") \
                     .write.mode("append") \
                     .bucketBy(16, "match_id") \
                #      .saveAsTable("medals_matches_players_bucketed")

join1 = matches.alias("m").join(
    match_details.alias("md"), 
    on=F.col("m.match_id") == F.col("md.match_id"), 
    how="inner"
)

final_join = join1.join(
    medals_matches_players.alias("mmp"), 
    on=F.col("m.match_id") == F.col("mmp.match_id"), 
    how="inner"
)

final_join = final_join.select(
    F.col("m.match_id").alias("matches_match_id"),
    F.col("m.mapid"),
    F.col("m.is_team_game"),
    F.col("m.playlist_id"),
    F.col("m.completion_date"),
    F.col("md.player_gamertag"),
    F.col("md.player_total_kills"),
    F.col("md.player_total_deaths"),
    F.col("mmp.medal_id"),
    F.col("mmp.count")
)

# Which player averages the most kills per game?
player_kills = final_join.groupBy("md.player_gamertag").agg(
    F.avg("md.player_total_kills").alias("avg_kills_per_game")
)

player_kills_sorted = player_kills.orderBy(F.desc("avg_kills_per_game"))

player_kills_sorted.show(1)

# Which playlist gets played the most?
playlist_counts = final_join.groupBy("m.playlist_id").agg(
    F.count("matches_match_id").alias("matches_played")
)

playlist_sorted = playlist_counts.orderBy(F.desc("matches_played"))

playlist_sorted.show(1)

# Which map gets played the most?
map_counts = final_join.groupBy("m.mapid").agg(
    F.count("matches_match_id").alias("matches_played")
)

map_sorted = map_counts.orderBy(F.desc("matches_played"))

map_sorted.show(1)
# 5. Try different .sortWithinPartitions to see which has the smallest data size
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
                    .appName("Jupyter") \
                    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
                    .getOrCreate()

matches = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("../data/matches.csv")

matches.select("match_id", "is_team_game", "playlist_id", "completion_date", "mapid") \
        .write.mode("append") \
        .partitionBy("completion_date") \
        .bucketBy(16, "match_id") \
        # .saveAsTable("matches_bucketed")

match_details = spark.read.option("header", "true") \
                          .option("inferSchema", "true") \
                          .csv("../data/match_details.csv")

medals_matches_players = spark.read.option("header", "true") \
                                  .option("inferSchema", "true") \
                                  .csv("../data/medals_matches_players.csv")

match_details.select("match_id", "player_gamertag", "player_total_kills", "player_total_deaths") \
             .write.mode("append") \
             .bucketBy(16, "match_id") \
        #      .saveAsTable("match_details_bucketed")

medals_matches_players.select("match_id", "medal_id", "count") \
                     .write.mode("append") \
                     .bucketBy(16, "match_id") \
                #      .saveAsTable("medals_matches_players_bucketed")

join1 = matches.alias("m").join(
    match_details.alias("md"), 
    on=F.col("m.match_id") == F.col("md.match_id"), 
    how="inner"
)

final_join = join1.join(
    medals_matches_players.alias("mmp"), 
    on=F.col("m.match_id") == F.col("mmp.match_id"), 
    how="inner"
)

final_join = final_join.select(
    F.col("m.match_id").alias("matches_match_id"),
    F.col("m.mapid"),
    F.col("m.is_team_game"),
    F.col("m.playlist_id"),
    F.col("m.completion_date"),
    F.col("md.player_gamertag"),
    F.col("md.player_total_kills"),
    F.col("md.player_total_deaths"),
    F.col("mmp.medal_id"),
    F.col("mmp.count")
)

mapid_playlist_sort_df = final_join.sortWithinPartitions(F.col("m.mapid"), F.col("m.playlist_id"))
matchid_mapid_playlist_sort_df = final_join.sortWithinPartitions(F.col("matches_match_id"), F.col("m.mapid"), F.col("m.playlist_id"))
mapid_playlist_matchid_sort_df = final_join.sortWithinPartitions(F.col("m.mapid"), F.col("m.playlist_id"), F.col("matches_match_id"))
playlist_mapid_matchid_sort_df = final_join.sortWithinPartitions(F.col("m.playlist_id"), F.col("m.mapid"), F.col("matches_match_id"))

final_join.write.mode("overwrite").parquet("final_join")
mapid_playlist_sort_df.write.mode("overwrite").parquet("mapid_playlist_sort")
matchid_mapid_playlist_sort_df.write.mode("overwrite").parquet("matchid_mapid_playlist_sort")
mapid_playlist_matchid_sort_df.write.mode("overwrite").parquet("mapid_playlist_matchid_sort")
playlist_mapid_matchid_sort_df.write.mode("overwrite").parquet("playlist_mapid_matchid_sort")

sc = spark.sparkContext
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

def get_file_size_and_count(path):
    files = fs.globStatus(sc._jvm.org.apache.hadoop.fs.Path(path + "/*"))
    file_sizes = sum(file.getLen() for file in files)
    return file_sizes

final_join_size = get_file_size_and_count("final_join")
mapid_playlist_size = get_file_size_and_count("mapid_playlist_sort")
matchid_mapid_playlist_size = get_file_size_and_count("matchid_mapid_playlist_sort")
mapid_playlist_matchid_size = get_file_size_and_count("mapid_playlist_matchid_sort")
playlist_mapid_matchid_sort_df = get_file_size_and_count("playlist_mapid_matchid_sort")

print(f"final_join: {final_join_size:,} bytes") # 8,087,514 bytes   
print(f"mapid_playlist_sort: {mapid_playlist_size:,} bytes") # 15,453,444 bytes
print(f"matchid_mapid_playlist_sort: {matchid_mapid_playlist_size:,} bytes") # 8,103,554 bytes
print(f"mapid_playlist_matchid_sort: {mapid_playlist_matchid_size:,} bytes") # 7,837,038 bytes
print(f"playlist_mapid_matchid_sort: {playlist_mapid_matchid_sort_df:,} bytes") # 7,847,491 bytes

I hung out a bunch on Zach Wilson’s community discord today as well ~ trying to discover/fix some bugs people (including me as well) had.


That is all for today!

See you tomorrow :)