view on github

Introduction

The term ETL pipeline refers to the processes of extraction (from the cloud/APIs/databases/files), transforming (the raw data), and loading of data into a database such as a data warehouse.

I’m going to create an ETL pipeline in Python, below is the general idea:

  • Extract — extracting information of Youtube Trending videos from Youtube Search and Download API. Those trending videos updates roughly every 15 minutes. With each update, videos may move up, down, or stay in the same position in the list. In Part 1, I will pull the data once and focus on how to create the pipeline. In Part 2, I’m planning to create a scheduled ETL workflow, that is, call the API at regular intervals to get new trending videos.

  • Transform — cleaning and formatting data from the API’s JSON responses to Pandas Dataframes.

  • Load — writing the data into MySQL in AWS RDS.

Extraction

Request from the API

Sign up for Rapid API, go to the Youtube Search and Download API and then hit “Subscribe to test”. It offers multiple pricing plans, select the free “Basic” one:

Select basic plan

import pandas as pd
import requests
import importlib
# import API keys, MySQL user/password from api_keys.py
import api_keys

def get_video_info(content):
    if "video" in content:
        video = content["video"]
        # remove thumbnails
        video.pop("thumbnails", "No Key Found")
        return video
    else:
        return None

def extract_data():
    """
    Type of trending videos:
    n - now (default)
    mu - music
    mo - movies
    g - gaming
    """
    querystring = {"type":"n","hl":"en","gl":"US"}
    url = "https://youtube-search-and-download.p.rapidapi.com/trending"
    headers = {
        "X-RapidAPI-Key": api_keys.rapid_api_key,
        "X-RapidAPI-Host": "youtube-search-and-download.p.rapidapi.com"
    }
    response = requests.request("GET", url, headers=headers, params=querystring)
    # convert response string to JSON
    response_json = response.json()  
    contents = response_json['contents']
    df = pd.DataFrame([get_video_info(content) for content in contents])
    return df

df_videos = extract_data()
df_videos.head()
channelId channelName lengthText publishedTimeText title videoId viewCountText
0 UCOmHUn--16B90oW2L6FRR3A BLACKPINK 3:14 21 hours ago BLACKPINK - ‘Pink Venom’ M/V gQlMMD8auMs 80,090,195 views
1 UCCE5NVlm0FhbunMMBT48WAw Fundy 15:09 11 hours ago So I Made Minecraft More Satisfying... 1N22qIy_vHs 753,860 views
2 UCJbYdyufHR-cxOuY96KIoqA AMP 45:37 1 day ago AMP GOES TO THERAPY 6M9UG1ueBZU 943,982 views
3 UCvj3hNvwrEgTRkeut7_cBAQ JiDion 12:15 1 day ago Spending The Day With A Loch Ness Monster Expert! WRhCmAgEmYs 1,740,342 views
4 UCISglu-JC04IuYH13mfYHzw Dabl 4:10 1 day ago Undercover Boss Of Checkers Restaurant Is Forc... h6dL2un7YGg 944,094 views

Transformation

First of all, we need to remove missing values and duplicates.

Besides, We need to convert lengthText and publishedTimeText to datetime, and convert viewCountText to numbers.

Neither channel name nor video title is unique, let’s keep channelId and videoId as the identifiers. And we need to record the current ranking in trending.

import re
from datetime import datetime, timedelta

def get_length_minutes(s):
    t = 0
    for u in s.split(':'):
        t = 60 * t + int(u)
    # convert total seconds to minutes
    return round(t/60, 2)
    
def get_published_time(s):    
    unit_list = ["weeks", "days", "hours", "minutes", 
                 "seconds", "milliseconds", "microseconds"]
    now = datetime.now()
    parsed_s = [s.split()[:2]]
    unit = parsed_s[0][1]
    if unit in unit_list:
        time_dict = dict((fmt,float(amount)) for amount,fmt in parsed_s)
    elif unit+"s" in unit_list:
        time_dict = dict((fmt+"s",float(amount)) for amount,fmt in parsed_s)
    elif unit in ["month", "months"]:
        time_dict = dict(("days",float(amount)*30) for amount,fmt in parsed_s)
    elif unit in ["year", "years"]:
        time_dict = dict(("days",float(amount)*365) for amount,fmt in parsed_s)  
    else:
        return now   
    dt = datetime.now() - timedelta(**time_dict)
    return dt
        
def get_views_count(s):
    count = ''.join(re.findall(r'(\d+)', s))
    # get count in millions
    count_m = round(float(count)/1E6, 2)
    return count_m

def transform_data(data):
    # remove duplicates
    data.dropna(inplace=True)
    # remove rows where at least one element is missing 
    data.drop_duplicates(inplace=True)
    # add column of current ranking
    data.index += 1
    data = data.rename_axis('rank').reset_index()
    # add column of extracted at
    data["extracted_at"] = datetime.now()
    # convert text to other formats
    data["length_minutes"] = data["lengthText"].apply(get_length_minutes)
    data["published_time"] = data["publishedTimeText"].apply(get_published_time)
    data["views_millions"] = data["viewCountText"].apply(get_views_count)
    # remove unneeded columns
    data.drop(columns=["lengthText", "publishedTimeText", "viewCountText"], inplace=True)
    
    return data

df_videos_clean = transform_data(df_videos)

Loading

I did the following steps:

  • Step 1: sign up for AWS.
  • Step 2: follow Tutorial and select “Free Tier” plan to create a MySQL DB instance in AWS RDS.
  • Step 3: follow Instruction to connect MySQL Workbench to AWS RDS.
  • Step 4: create schema and table youtube_trending_videos in the AWS RDS DB instance.

Here is the code to load the dataframe into MySQL.

import pandas as pd
from sqlalchemy import create_engine
import pymysql

def connect():
    user = api_keys.mysql_user
    pwd = api_keys.mysql_pwd
    host= api_keys.aws_endpoint
    port = api_keys.mysql_port
    database= api_keys.mysql_db
    engine = create_engine(f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{database}")
    connection = engine.connect()
    return connection
   
def load_data(data, table):
    connection = connect()
    try:   
        data.to_sql(table, if_exists='append', con=connection, index=False)
        print("Success")
    except:
        print("Found Error")
    connection.close()
        
load_data(df_videos_clean, "youtube_trending_videos")
Success

Double check that table in MySQL Workbench, it did saved successfully.

videos data saved in mysql

The end

In Part 1, I’ve created a simple ETL pipeline that can extract the data of Youtube trending videos, transform it with Python libraries, and then load it to AWS RDS. But the script is still running in my laptop.

I’m going to use AWS Lambda to make the pipeline severless and run the code daily in Part 2.