(Day 322) Data Eng camp homework 1 completed (maybe)

Ivan Ivanov · November 18, 2024

Hello :) Today is Day 322!

A quick summary of today:

  • watched lab 2 - SCD modelling
  • finished the week 1 homework
  • trained a failing prompt tuned llm
  • streamed

It is 1am and I am very sleepy so this will be short and sweet ~

Data Modeling - Building Slowly Changing Dimensions (SCDs) - Day 2 Lab

The lab 2 video was released today so I watched and took some notes.

Here is my code from the lab:

create table players_scd (
    player_name text,
    scoring_class scoring_class,
    is_active boolean,
    -- for type 2 scd we need the two more cols
    start_season integer,
    end_season integer,
    current_season integer,
    primary key (player_name, start_season)
);

-- we want to calculate how many seasons they have been in a current dim
-- we look at what was the dim before - we use lag
-- the big expensive parts are the windows
-- scale matters and something to always consider
insert into players_scd
with with_previous as (
    select player_name,
          current_season,
          scoring_class,
          is_active,
          lag(scoring_class, 1)
          over (partition by player_name order by current_season)                   as previous_scoring_class,
          lag(is_active, 1) over (partition by player_name order by current_season) as previous_is_active
   from players
   where current_season <= 2022
            ),
    with_indicators as (
        select *,
               case
                   when scoring_class <> previous_scoring_class then 1
                   when is_active <> previous_is_active then 1
                   else 0
                end as change_indicator
        from with_previous
),
    with_streaks as (
        select *,
            sum(change_indicator)
                over (partition by player_name order by current_season) as streak_identifier
        from with_indicators
    )

select player_name,
       scoring_class,
       is_active,
       min(current_season) as start_season,
       max(current_season) as end_season,
       2022 as current_season
from with_streaks
group by player_name, streak_identifier, is_active, scoring_class
order by player_name, streak_identifier


create type scd_type as (
    scoring_class scoring_class,
    is_active boolean,
    start_season integer,
    end_season integer
);

-- see if things have changed
-- this is incremental, the previous is all at once
-- the prev adds all but last year, and this adds the last just for the sake of an example
-- this is processing probably around 20x less data
-- we assume scoring_class and is_active are never null
with last_season_scd as (
    select * from players_scd
    where current_season = 2022
    and end_season = 2022
),
    historical_scd as (
        select
            player_name,
            scoring_class,
            is_active,
            start_season,
            end_season
        from players_scd
        where current_season = 2022
        and end_season < 2022
    ),
    this_seasons_data as (
        select * from players
        where current_season = 2023
    ),
    unchanged_records as (
        select ts.player_name,
               ts.scoring_class,
               ts.is_active,
               ls.start_season,
               ts.current_season as end_season
        from this_seasons_data ts
            join last_season_scd ls
            on ts.player_name = ls.player_name
        where ts.scoring_class = ls.scoring_class
        and ts.is_active = ls.is_active
    ),
    changed_records as (
        select ts.player_name,
               unnest(array[
                   row(
                       ls.scoring_class,
                       ls.is_active,
                       ls.start_season,
                       ls.end_season
                       )::scd_type,
                   row( -- new record is just 1 value
                       ts.scoring_class,
                       ts.is_active,
                       ts.current_season,
                       ts.current_season
                       )::scd_type
                   ]) as records
        from this_seasons_data ts
            left join last_season_scd ls
            on ts.player_name = ls.player_name
        where (ts.scoring_class <> ls.scoring_class
            or ts.is_active <> ls.is_active)
    ),
    unnested_chaged_records as (
        select player_name,
               (records::scd_type).scoring_class,
               (records::scd_type).is_active,
               (records::scd_type).start_season,
               (records::scd_type).end_season
        from changed_records
    ),
    new_records as (
        select
            ts.player_name,
            ts.scoring_class,
            ts.is_active,
            ts.current_season as start_season,
            ts.current_season as end_season
        from this_seasons_data ts
        left join last_season_scd ls
            on ts.player_name = ls.player_name
        where ls.player_name is null
    )

select * from historical_scd
union all
select * from unchanged_records
union all
select* from unnested_chaged_records
union all
select * from new_records;

select * from players_scd

And then based on this I completed all the homework:

-- 1. DDL for actors table
create type films as (
    film text,
    votes integer,
    rating real,
    filmid text
);

create type quality_class as enum ('star', 'good', 'average', 'bad');

create table actors (
    actorid text,
    actor text,
    films films[],
    quality_class quality_class,
    current_year integer,
    is_active boolean,
    primary key (actorid, current_year)
);

-- 2. Cumulative table generation query
DO $$
DECLARE
    y_year INT; -- Previous year
    t_year INT; -- Target year
BEGIN
    FOR y_year IN 1969..2020 LOOP
        t_year := y_year + 1;
        insert into actors
        with yesterday as (
            select *
            from actors
            where current_year = y_year
        ),
        today_aggregated as (
            select
                actorid,
                max(actor) as actor,
                array_agg(row(film, votes, rating, filmid)::films) as films,
                max(year) as year,
                avg(rating) as average_rating
            from actor_films
            where year = t_year
            group by actorid
        )
        select
            coalesce(t.actorid, y.actorid) as actorid,
            coalesce(t.actor, y.actor) as actor,
            case
                when y.films is null then t.films
                when t.films is not null then y.films || t.films
                else y.films
            end as films,
            case
                when t.films is not null then
                    case when t.average_rating > 8 then 'star'
                        when t.average_rating > 7 then 'good'
                        when t.average_rating > 6 then 'average'
                        else 'bad'
                    end::quality_class
                else y.quality_class
            end as quality_class,
            coalesce(t.year, y.current_year + 1) as current_year,
            case when t.films is not null then true else false end as is_active
        from today_aggregated t
        full outer join yesterday y
            on t.actorid = y.actorid;
    END LOOP;
END $$;

-- 3. DDL for actors_history_scd table
create table actors_history_scd (
    actorid text,
    quality_class quality_class,
    is_active boolean,
    start_date integer,
    end_date integer,
    current_year integer,
    primary key (actorid, start_date)
);

-- 4. Backfill query for actors_history_scd
insert into actors_history_scd
with with_previous as (
    select
        actorid,
        current_year,
        quality_class,
        is_active,
        lag(quality_class, 1)
            over (partition by actorid order by current_year) as previous_quality_class,
        lag(is_active, 1)
            over (partition by actorid order by current_year) as previous_is_active
    from actors
    where current_year <= 2020
),
    with_indicators as (
        select *,
            case
                when quality_class <> previous_quality_class then 1
                when is_active <> previous_is_active then 1
                else 0
            end as change_indicator
        from with_previous
    ),
    with_streaks as (
        select *,
            sum(change_indicator)
                over (partition by actorid order by current_year) as streak_identifier
        from with_indicators
    )

select actorid,
       quality_class,
       is_active,
       min(current_year) as start_date,
       max(current_year) as end_date,
       2020 as current_year
from with_streaks
group by actorid, streak_identifier, is_active, quality_class
order by actorid, streak_identifier;

-- 5. Incremental query for actors_history_scd
create type scd_actors_type as (
    quality_class quality_class,
    is_active boolean,
    start_date integer,
    end_date integer
);

with last_year_scd as (
    select * from actors_history_scd
    where current_year = 2020
    and end_date = 2020
),
    historical_scd as (
        select
            actorid,
            quality_class,
            is_active,
            start_date,
            end_date
        from actors_history_scd
        where current_year = 2020
        and end_date < 2020
    ),
    this_year_data as (
        select * from actors
        where current_year = 2021
    ),
    unchanged_records as (
        select ts.actorid,
               ts.quality_class,
               ts.is_active,
               ls.start_date,
               ts.current_year as end_date
        from this_year_data ts
            join last_year_scd ls
            on ts.actorid = ls.actorid
        where ts.quality_class = ls.quality_class
        and ts.is_active = ls.is_active
    ),
    changed_records as (
        select ts.actorid,
               unnest(array[
                   row(
                       ls.quality_class,
                       ls.is_active,
                       ls.start_date,
                       ls.end_date
                       )::scd_actors_type,
                   row( -- new record is just 1 value
                       ts.quality_class,
                       ts.is_active,
                       ts.current_year,
                       ts.current_year
                       )::scd_actors_type
                   ]) as records
        from this_year_data ts
            left join last_year_scd ls
            on ts.actorid = ls.actorid
        where (ts.quality_class <> ls.quality_class
            or ts.is_active <> ls.is_active)
    ),
    unnested_chaged_records as (
        select actorid,
               (records::scd_actors_type).quality_class,
               (records::scd_actors_type).is_active,
               (records::scd_actors_type).start_date,
               (records::scd_actors_type).end_date
        from changed_records
    ),
    new_records as (
        select
            ts.actorid,
            ts.quality_class,
            ts.is_active,
            ts.current_year as start_date,
            ts.current_year as end_date
        from this_year_data ts
        left join last_year_scd ls
            on ts.actorid = ls.actorid
        where ls.actorid is null
    ),
    all_records as (
        select * from (
        select * from historical_scd
        union all
        select * from unchanged_records
        union all
        select * from unnested_chaged_records
        union all
        select * from new_records
        ) a
    )
select *
from all_records

I hung out in the discord looking at other people’s questions and problems ~

I was in the lab today

Today I figured out how to do prompt tuning and I used llama-3.2-1b and ran the training which took ~4 hours, but at the end it seems the model did not learn anything. I am suspecting because of the training arguments I passed. Before I left, I started a 9hr training and left it to go overnight so I will check the results tomorrow.

Streamed

I watched 2 talks on how Riot Games does data engineering, and uses data to enhance player experience which were very nice (at least the 2nd one, the 1st one was a bit too vague and short).

Then I had a quick look at sktime - a library for time-series forecasting based on sklearn


That is all for today!

See you tomorrow :)