[Python] 유튜브 조회수 데이터 시각화

반응형

여러 비즈니스 이해관계자들을 위해 '예쁜' 차트를 만들어내는 과정은 생각보다 훨씬 복잡하고 지루하다. 특히 유튜브 API와 같은 외부 데이터는 전처리 과정에서 예상치 못한 문제들이 계속 발생한다. 오늘은 한번 파이썬 기반 유튜브 데이터 시각화 방법을 다뤄보려고 한다. 화려한 그래프보다는 안정적인 코드, 재사용 가능한 패턴, 그리고 흔히 발생하는 문제 해결에 초점을 맞추려고 하는데. 잘될까

유튜브 API 접근 및 데이터 추출하기

유튜브 데이터 분석을 시작하기 전에 가장 먼저 해야 할 일은 API 키를 발급받는 것이다. 구글 개발자 콘솔에서 프로젝트를 생성하고 YouTube Data API v3를 활성화한 다음 API 키를 발급받아야 한다. 이 과정은 구글의 문서화가 잘 되어 있기 때문에 상세히 설명하지 않는다. 아래 코드를 실행하기 위해서는 반드시 자신의 API 키가 필요하니 발급받아 두자.

API 키를 준비했으면 필요한 라이브러리부터 설치한다. 가상환경을 사용하는 것이 좋다.

pip install google-api-python-client pandas matplotlib seaborn plotly dash

이제 YouTube API를 사용하여 채널 또는 특정 비디오의 데이터를 가져오는 기본 코드를 구현할 차례다. 아래 코드는 특정 채널의 최근 비디오 목록과 각 비디오의 조회수를 가져오는 함수다.

import os
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

def get_channel_videos(api_key, channel_id, max_results=50):
    """
    특정 채널의 동영상 정보를 가져오는 함수
    
    Parameters:
    - api_key: YouTube Data API 키
    - channel_id: 분석할 YouTube 채널 ID
    - max_results: 가져올 최대 비디오 수 (최대 50개)
    
    Returns:
    - videos_data: 비디오 정보가 담긴 DataFrame
    """
    try:
        # API 클라이언트 객체 생성
        youtube = build('youtube', 'v3', developerKey=api_key)
        
        # 채널의 업로드 재생목록 ID 가져오기
        channels_response = youtube.channels().list(
            part='contentDetails',
            id=channel_id
        ).execute()
        
        # 채널 정보에서 업로드 재생목록 ID 추출
        # 실무에서는 항상 예외 처리를 해두는 것이 좋다
        if 'items' not in channels_response or not channels_response['items']:
            raise ValueError(f"해당 채널 ID({channel_id})에 대한 정보를 찾을 수 없습니다.")
            
        uploads_playlist_id = channels_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
        
        # 재생목록에서 비디오 정보 가져오기
        videos = []
        next_page_token = None
        
        # 페이징 처리된 API 응답을 모두 가져오기 위한 반복문
        while True:
            playlist_response = youtube.playlistItems().list(
                part='snippet',
                playlistId=uploads_playlist_id,
                maxResults=max_results,
                pageToken=next_page_token
            ).execute()
            
            # 비디오 ID만 추출하여 저장
            video_ids = [item['snippet']['resourceId']['videoId'] 
                        for item in playlist_response['items']]
            
            # 비디오 세부 정보 (조회수 등) 가져오기
            videos_response = youtube.videos().list(
                part='statistics,snippet',
                id=','.join(video_ids)
            ).execute()
            
            # 필요한 정보만 추출하여 리스트에 추가
            for video in videos_response['items']:
                video_data = {
                    'video_id': video['id'],
                    'title': video['snippet']['title'],
                    'published_at': video['snippet']['publishedAt'],
                    'view_count': int(video['statistics'].get('viewCount', 0)),
                    'like_count': int(video['statistics'].get('likeCount', 0)),
                    'comment_count': int(video['statistics'].get('commentCount', 0))
                }
                videos.append(video_data)
            
            # 다음 페이지가 있는지 확인
            next_page_token = playlist_response.get('nextPageToken')
            if not next_page_token:
                break
        
        # DataFrame으로 변환
        videos_df = pd.DataFrame(videos)
        
        # 날짜 형식 변환
        videos_df['published_at'] = pd.to_datetime(videos_df['published_at'])
        
        return videos_df
        
    except HttpError as e:
        print(f"YouTube API 오류 발생: {e}")
        return pd.DataFrame()
    except Exception as e:
        print(f"오류 발생: {e}")
        return pd.DataFrame()

조회수 데이터 정제와 전처리

유튜브 API에서 가져온 데이터는 대부분 그대로 사용하기 어렵다. 특히 시계열 분석을 위해서는 날짜 형식을 맞추고, 누락된 값을 처리하고, 이상치를 제거하는 등의 전처리 작업이 필요하다. 아래는 데이터 정제를 위한 몇 가지 함수다.

먼저 API로 얻은 데이터의 다양한 문제점과 해결 방법을 표로 정리하면 다음과 같다.

문제점 원인 해결 방법
누락된 조회수 데이터 비공개 또는 삭제된 동영상 fillna() 메서드로 0 또는 이전 값으로 대체
이상치(극단값) 존재 바이럴 콘텐츠, 광고 등의 영향 IQR 방식으로 이상치 탐지 및 제거/조정
날짜 형식 불일치 API 응답의 ISO 형식 vs. 분석용 날짜 형식 pandas의 to_datetime() 활용
시간대(Timezone) 문제 UTC 기준 API 응답 tz_localize() 및 tz_convert() 사용
데이터 타입 불일치 API 응답의 문자열 형식 숫자 astype() 메서드로 형변환

이제 위 문제들을 해결하는 코드를 작성해보자. 다음은 데이터 정제를 위한 함수다.

def clean_youtube_data(df):
    """
    유튜브 API에서 가져온 데이터를 정제하는 함수
    
    Parameters:
    - df: 원본 YouTube 데이터 DataFrame
    
    Returns:
    - cleaned_df: 정제된 DataFrame
    """
    # 데이터 복사본 생성 (원본 변경 방지)
    cleaned_df = df.copy()
    
    # 1. 누락된 데이터 처리
    # 조회수, 좋아요 수, 댓글 수 등이 없는 경우 0으로 대체
    numeric_cols = ['view_count', 'like_count', 'comment_count']
    cleaned_df[numeric_cols] = cleaned_df[numeric_cols].fillna(0)
    
    # 2. 데이터 타입 변환
    # 숫자형 컬럼 변환
    for col in numeric_cols:
        cleaned_df[col] = cleaned_df[col].astype(int)
    
    # 3. 날짜 형식 처리
    # 이미 to_datetime으로 변환했지만, 시간대 조정이 필요할 수 있음
    if 'published_at' in cleaned_df.columns:
        # UTC 기준 시간을 로컬 시간대로 변환 (예: 한국 시간)
        # cleaned_df['published_at'] = cleaned_df['published_at'].dt.tz_localize('UTC').dt.tz_convert('Asia/Seoul')
        
        # 날짜만 필요한 경우
        cleaned_df['published_date'] = cleaned_df['published_at'].dt.date
        
        # 연도와 월만 필요한 경우 (월별 분석용)
        cleaned_df['year_month'] = cleaned_df['published_at'].dt.strftime('%Y-%m')
    
    # 4. 이상치 처리 (IQR 방법)
    for col in numeric_cols:
        Q1 = cleaned_df[col].quantile(0.25)
        Q3 = cleaned_df[col].quantile(0.75)
        IQR = Q3 - Q1
        
        # 이상치 경계 설정 (보통 1.5 * IQR 사용)
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # 이상치 처리 방법 1: 이상치 제거
        # cleaned_df = cleaned_df[(cleaned_df[col] >= lower_bound) & (cleaned_df[col] <= upper_bound)]
        
        # 이상치 처리 방법 2: 경계값으로 조정 (실무에서 더 많이 사용)
        cleaned_df.loc[cleaned_df[col] < lower_bound, col] = lower_bound
        cleaned_df.loc[cleaned_df[col] > upper_bound, col] = upper_bound
    
    # 5. 추가 피처 생성
    # 업로드 후 경과 일수 계산
    if 'published_at' in cleaned_df.columns:
        current_date = pd.Timestamp.now()
        cleaned_df['days_since_published'] = (current_date - cleaned_df['published_at']).dt.days
        
        # 일평균 조회수 계산
        cleaned_df['views_per_day'] = cleaned_df['view_count'] / cleaned_df['days_since_published']
        cleaned_df['views_per_day'] = cleaned_df['views_per_day'].fillna(0)  # 0으로 나누기 방지
    
    return cleaned_df

Matplotlib과 Seaborn으로 기본 차트 그리기

정제된 데이터를 기반으로 이제 시각화를 시작할 수 있다. 파이썬에서 가장 널리 사용되는 시각화 라이브러리는 Matplotlib와 Seaborn이다. 이 두 라이브러리는 정적 시각화에 적합하다. 기본적인 시각화부터 시작해보자.

Matplotlib과 Seaborn을 이용해 유튜브 데이터를 효과적으로 시각화하는 방법은 다양하다. 다음은 자주 사용되는 시각화 유형들이다:

  1. 시간에 따른 조회수 추이 (라인 차트)
  2. 인기 동영상 Top 10 (바 차트)
  3. 조회수와 좋아요 수 관계 (산점도)
  4. 월별 업로드 수와 평균 조회수 (복합 차트)
  5. 조회수 분포 (히스토그램)
  6. 업로드 요일별 평균 조회수 (바 차트)

이러한 시각화를 위한 함수를 구현해보자. 다음은 기본적인 시각화를 위한 함수들이다.

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from matplotlib.dates import DateFormatter
import matplotlib.ticker as ticker

# 시각화를 위한 기본 설정
plt.rc('font', family='DejaVu Sans')  # 한글 폰트가 필요한 경우 적절히 변경
plt.rc('axes', unicode_minus=False)   # 마이너스 부호 깨짐 방지
plt.rc('figure', figsize=(12, 6))     # 기본 그래프 크기
sns.set_style('whitegrid')            # Seaborn 스타일 설정

def plot_views_trend(df, title="시간에 따른 조회수 추이", save_path=None):
    """시간에 따른 조회수 추이를 라인 차트로 그리는 함수"""
    # 날짜별로 정렬
    df_sorted = df.sort_values('published_at')
    
    plt.figure(figsize=(14, 7))
    plt.plot(df_sorted['published_at'], df_sorted['view_count'], marker='o', linestyle='-', color='#1f77b4')
    
    # x축 날짜 포맷 설정
    plt.gca().xaxis.set_major_formatter(DateFormatter('%Y-%m-%d'))
    plt.xticks(rotation=45)
    
    # y축에 천 단위 쉼표 추가
    plt.gca().yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))
    
    plt.title(title, fontsize=16, pad=20)
    plt.xlabel('게시일', fontsize=12, labelpad=10)
    plt.ylabel('조회수', fontsize=12, labelpad=10)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
    
    plt.show()

def plot_top_videos(df, n=10, title="인기 동영상 Top 10", save_path=None):
    """인기 동영상 Top N을 바 차트로 그리는 함수"""
    # 조회수 기준 상위 N개 동영상 선택
    top_n = df.nlargest(n, 'view_count')
    
    plt.figure(figsize=(14, 8))
    
    # 가로 바 차트 그리기
    bars = plt.barh(top_n['title'], top_n['view_count'], color='#2ca02c')
    
    # 바 끝에 값 표시
    for i, bar in enumerate(bars):
        plt.text(bar.get_width() + bar.get_width() * 0.01, 
                 bar.get_y() + bar.get_height()/2, 
                 f'{int(bar.get_width()):,}', 
                 va='center', fontsize=10)
    
    plt.title(title, fontsize=16, pad=20)
    plt.xlabel('조회수', fontsize=12, labelpad=10)
    # y축 레이블 제거 (동영상 제목이 표시되어 있으므로)
    plt.ylabel('')
    
    # 제목이 너무 길면 자르기
    plt.gca().set_yticklabels([title[:40] + '...' if len(title) > 40 else title 
                              for title in top_n['title']])
    
    # x축에 천 단위 쉼표 추가
    plt.gca().xaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))
    
    plt.grid(True, axis='x', alpha=0.3)
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
    
    plt.show()

def plot_views_likes_scatter(df, title="조회수와 좋아요 수의 관계", save_path=None):
    """조회수와 좋아요 수의 관계를 산점도로 그리는 함수"""
    plt.figure(figsize=(12, 8))
    
    # 산점도 그리기
    sns.scatterplot(data=df, x='view_count', y='like_count', alpha=0.6, s=100)
    
    # 추세선 추가
    sns.regplot(data=df, x='view_count', y='like_count', 
                scatter=False, ci=None, line_kws={"color": "red"})
    
    plt.title(title, fontsize=16, pad=20)
    plt.xlabel('조회수', fontsize=12, labelpad=10)
    plt.ylabel('좋아요 수', fontsize=12, labelpad=10)
    
    # 축에 천 단위 쉼표 추가
    plt.gca().xaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))
    plt.gca().yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}'))
    
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
    
    plt.show()
📝 자주 묻는 질문: 한글 폰트 문제

Matplotlib에서 한글을 사용할 때 글자가 깨지는 문제가 자주 발생한다. 이 문제를 해결하려면 다음 코드를 실행하기 전에 한글 폰트를 설정해야 한다.

# Windows
plt.rc('font', family='Malgun Gothic')

# macOS
plt.rc('font', family='AppleGothic')

# Linux/범용
plt.rc('font', family='NanumGothic')
  

시스템에 해당 폰트가 설치되어 있어야 하며, 설치되지 않은 경우 'matplotlib-font-manager' 패키지를 사용하여 폰트를 설치하고 등록할 수 있다.

Plotly를 활용한 인터랙티브 시각화

Matplotlib과 Seaborn은 정적 시각화에 적합하지만, 사용자 상호작용이 필요한 경우에는 Plotly가 더 적합하다. Plotly는 자바스크립트 기반 시각화 라이브러리인 Plotly.js를 파이썬에서 사용할 수 있게 해주는 라이브러리다. 인터랙티브한 차트를 만들어 사용자가 데이터를 탐색할 수 있게 해준다.

다음은 Plotly를 사용하여 유튜브 조회수 데이터를 인터랙티브하게 시각화하는 예제다. 이 코드는 HTML 파일로 저장하여 웹 브라우저에서 열어볼 수 있다.

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd

def create_interactive_views_trend(df, title="시간에 따른 조회수 추이", save_path=None):
    """
    시간에 따른 조회수 추이를 인터랙티브 라인 차트로 그리는 함수
    
    Parameters:
    - df: 정제된 YouTube 데이터 DataFrame
    - title: 차트 제목
    - save_path: HTML 파일로 저장할 경로 (None이면 저장하지 않음)
    """
    # 날짜별로 정렬
    df_sorted = df.sort_values('published_at')
    
    # 그래프 생성
    fig = px.line(df_sorted, x='published_at', y='view_count', 
                  title=title,
                  labels={'published_at': '게시일', 'view_count': '조회수'},
                  hover_data=['title', 'like_count', 'comment_count'])
    
    # 레이아웃 조정
    fig.update_layout(
        title=dict(
            text=title,
            font=dict(size=20),
            x=0.5,  # 중앙 정렬
        ),
        xaxis_title='게시일',
        yaxis_title='조회수',
        hovermode='closest',
        template='plotly_white'
    )
    
    # 그리드 추가
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
    
    if save_path:
        fig.write_html(save_path)
    
    return fig

def create_interactive_top_videos(df, n=10, title="인기 동영상 Top 10", save_path=None):
    """
    인기 동영상 Top N을 인터랙티브 바 차트로 그리는 함수
    """
    # 조회수 기준 상위 N개 동영상 선택
    top_n = df.nlargest(n, 'view_count')
    
    # 제목이 너무 길면 자르기
    top_n['short_title'] = top_n['title'].apply(lambda x: x[:40] + '...' if len(x) > 40 else x)
    
    # 그래프 생성
    fig = px.bar(top_n, x='view_count', y='short_title', 
                orientation='h',  # 가로 바 차트
                title=title,
                labels={'view_count': '조회수', 'short_title': ''},
                hover_data=['title', 'published_at', 'like_count', 'comment_count'])
    
    # 레이아웃 조정
    fig.update_layout(
        title=dict(
            text=title,
            font=dict(size=20),
            x=0.5,  # 중앙 정렬
        ),
        yaxis=dict(
            autorange="reversed"  # 조회수 많은 순으로 위에서부터 표시
        ),
        hovermode='closest',
        template='plotly_white'
    )
    
    # 그리드 추가
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
    
    if save_path:
        fig.write_html(save_path)
    
    return fig

def create_interactive_scatter_with_trendline(df, title="조회수와 좋아요 수의 관계", save_path=None):
    """
    조회수와 좋아요 수의 관계를 인터랙티브 산점도로 그리는 함수
    """
    # 그래프 생성
    fig = px.scatter(df, x='view_count', y='like_count',
                    title=title,
                    labels={'view_count': '조회수', 'like_count': '좋아요 수'},
                    hover_data=['title', 'published_at'],
                    trendline='ols',  # OLS 추세선 추가
                    trendline_color_override='red')
    
    # 레이아웃 조정
    fig.update_layout(
        title=dict(
            text=title,
            font=dict(size=20),
            x=0.5,  # 중앙 정렬
        ),
        hovermode='closest',
        template='plotly_white'
    )
    
    # 그리드 추가
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='lightgray')
    
    if save_path:
        fig.write_html(save_path)
    
    return fig

def create_monthly_performance_chart(df, title="월별 업로드 수와 평균 조회수", save_path=None):
    """
    월별 업로드 수와 평균 조회수를 인터랙티브 복합 차트로 그리는 함수
    """
    # 월별 데이터 준비
    monthly_data = df.copy()
    monthly_data['year_month'] = monthly_data['published_at'].dt.strftime('%Y-%m')
    
    # 월별 업로드 수와 평균 조회수 계산
    monthly_stats = monthly_data.groupby('year_month').agg(
        upload_count=('video_id', 'count'),
        avg_views=('view_count', 'mean'),
        total_views=('view_count', 'sum')
    ).reset_index()
    
    # 복합 그래프 생성 (서브플롯)
    fig = make_subplots(specs=[[{"secondary_y": True}]])
    
    # 바 차트: 월별 업로드 수
    fig.add_trace(
        go.Bar(
            x=monthly_stats['year_month'],
            y=monthly_stats['upload_count'],
            name="업로드 수",
            marker_color='#2ca02c'
        ),
        secondary_y=False  # 주 y축 사용
    )
    
    # 라인 차트: 월별 평균 조회수
    fig.add_trace(
        go.Scatter(
            x=monthly_stats['year_month'],
            y=monthly_stats['avg_views'],
            name="평균 조회수",
            marker_color='#1f77b4',
            mode='lines+markers'
        ),
        secondary_y=True  # 보조 y축 사용
    )
    
    # 축 이름 업데이트
    fig.update_xaxes(title_text="년월")
    fig.update_yaxes(title_text="업로드 수", secondary_y=False)
    fig.update_yaxes(title_text="평균 조회수", secondary_y=True)
    
    # 레이아웃 조정
    fig.update_layout(
        title=dict(
            text=title,
            font=dict(size=20),
            x=0.5,  # 중앙 정렬
        ),
        legend=dict(
            orientation="h",
            yanchor="bottom",
            y=1.02,
            xanchor="right",
            x=1
        ),
        hovermode='x unified',
        template='plotly_white'
    )
    
    if save_path:
        fig.write_html(save_path)
    
    return fig

Dash로 실시간 모니터링 대시보드 구축

단일 차트를 넘어서 여러 시각화를 하나의 대시보드로 통합하고 싶다면 Dash를 사용하는 것이 좋다. Dash는 Plotly를 기반으로 한 웹 애플리케이션 프레임워크로, 파이썬만으로 인터랙티브한 데이터 대시보드를 구축할 수 있게 해준다.

다음은 Dash를 사용하여 유튜브 조회수 데이터를 실시간으로 모니터링할 수 있는 대시보드를 구축하는 예제다. 다양한 대시보드 구성 요소와 그 특징을 표로 먼저 살펴보자.

대시보드 구성 요소 특징 구현 난이도
KPI 카드 주요 성과 지표 한눈에 보기 (총 조회수, 평균 조회수 등) 쉬움
시계열 차트 시간에 따른 성과 추이 표시 (일간/주간/월간 조회수 변화) 중간
상호작용 필터 날짜 범위, 동영상 카테고리 등으로 데이터 필터링 중간
데이터 테이블 원시 데이터 탐색 및 정렬 기능 쉬움
드릴다운 기능 특정 데이터 포인트 클릭 시 세부 정보 표시 어려움
데이터 자동 갱신 설정된 간격으로 API에서 새 데이터 가져오기 어려움

이제 Dash를 사용하여 간단한 유튜브 조회수 모니터링 대시보드를 구현해보자.

# youtube_dashboard.py
import dash
from dash import dcc, html, dash_table
from dash.dependencies import Input, Output
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 앞서 정의한 함수들 가져오기
from youtube_data import get_channel_videos, clean_youtube_data

# 앱 초기화
app = dash.Dash(__name__, title="유튜브 채널 분석 대시보드")
server = app.server  # 배포를 위한 서버 설정

# API 키와 채널 ID 설정 (실제 코드에서는 환경 변수 등으로 관리)
API_KEY = "YOUR_API_KEY"  # 실제 API 키로 변경 필요
CHANNEL_ID = "CHANNEL_ID"  # 분석할 채널 ID로 변경 필요

# 데이터 로드 및 전처리 함수
def load_data():
    try:
        # API에서 데이터 가져오기
        raw_data = get_channel_videos(API_KEY, CHANNEL_ID, max_results=50)
        
        # 데이터 정제
        if not raw_data.empty:
            cleaned_data = clean_youtube_data(raw_data)
            return cleaned_data
        else:
            # 테스트용 더미 데이터 (API 키가 없는 경우 등을 위한 대비책)
            dates = pd.date_range(start=datetime.now() - timedelta(days=100), periods=20, freq='5D')
            dummy_data = pd.DataFrame({
                'video_id': [f'vid_{i}' for i in range(20)],
                'title': [f'테스트 비디오 {i}' for i in range(20)],
                'published_at': dates,
                'view_count': np.random.randint(1000, 100000, 20),
                'like_count': np.random.randint(100, 5000, 20),
                'comment_count': np.random.randint(10, 500, 20)
            })
            return clean_youtube_data(dummy_data)
    except Exception as e:
        print(f"데이터 로드 중 오류 발생: {e}")
        return pd.DataFrame()

# 초기 데이터 로드
df = load_data()

# 앱 레이아웃 정의
app.layout = html.Div([
    # 대시보드 헤더
    html.Div([
        html.H1("유튜브 채널 분석 대시보드", 
                style={'textAlign': 'center', 'color': '#212121', 'marginBottom': 20}),
        html.P("유튜브 API를 활용한 채널 성과 모니터링", 
               style={'textAlign': 'center', 'color': '#666', 'marginBottom': 30})
    ]),
    
    # KPI 카드 섹션
    html.Div([
        html.Div([
            html.H3("총 조회수", style={'textAlign': 'center', 'color': '#444'}),
            html.H2(f"{df['view_count'].sum():,}", 
                    style={'textAlign': 'center', 'color': '#212121'})
        ], style={'flex': '1', 'backgroundColor': '#f9f9f9', 'border': '1px solid #ddd', 
                  'borderRadius': '8px', 'padding': '15px', 'margin': '10px'}),
        
        html.Div([
            html.H3("평균 조회수", style={'textAlign': 'center', 'color': '#444'}),
            html.H2(f"{int(df['view_count'].mean()):,}", 
                    style={'textAlign': 'center', 'color': '#212121'})
        ], style={'flex': '1', 'backgroundColor': '#f9f9f9', 'border': '1px solid #ddd', 
                  'borderRadius': '8px', 'padding': '15px', 'margin': '10px'}),
        
        html.Div([
            html.H3("총 동영상 수", style={'textAlign': 'center', 'color': '#444'}),
            html.H2(f"{len(df)}", 
                    style={'textAlign': 'center', 'color': '#212121'})
        ], style={'flex': '1', 'backgroundColor': '#f9f9f9', 'border': '1px solid #ddd', 
                  'borderRadius': '8px', 'padding': '15px', 'margin': '10px'}),
    ], style={'display': 'flex', 'justifyContent': 'space-between', 'marginBottom': 30}),
    
    # 날짜 필터 섹션
    html.Div([
        html.H3("날짜 범위 선택:"),
        dcc.DatePickerRange(
            id='date-range',
            start_date=df['published_at'].min().date() if not df.empty else None,
            end_date=df['published_at'].max().date() if not df.empty else None,
            display_format='YYYY-MM-DD'
        )
    ], style={'marginBottom': 20}),
    
    # 그래프 섹션 - 시간에 따른 조회수 추이
    html.Div([
        html.H3("시간에 따른 조회수 추이", style={'marginBottom': 10}),
        dcc.Graph(id='views-trend-chart')
    ], style={'marginBottom': 30}),
    
    # 그래프 섹션 - 인기 동영상 Top 10
    html.Div([
        html.H3("인기 동영상 Top 10", style={'marginBottom': 10}),
        dcc.Graph(id='top-videos-chart')
    ], style={'marginBottom': 30}),
    
    # 그래프 섹션 - 조회수와 좋아요 수 관계
    html.Div([
        html.H3("조회수와 좋아요 수의 관계", style={'marginBottom': 10}),
        dcc.Graph(id='scatter-chart')
    ], style={'marginBottom': 30}),
    
    # 데이터 테이블 섹션
    html.Div([
        html.H3("비디오 데이터", style={'marginBottom': 10}),
        dash_table.DataTable(
            id='video-data-table',
            columns=[
                {'name': '제목', 'id': 'title'},
                {'name': '게시일', 'id': 'published_at'},
                {'name': '조회수', 'id': 'view_count', 'type': 'numeric', 'format': {'specifier': ','}},
                {'name': '좋아요 수', 'id': 'like_count', 'type': 'numeric', 'format': {'specifier': ','}},
                {'name': '댓글 수', 'id': 'comment_count', 'type': 'numeric', 'format': {'specifier': ','}}
            ],
            data=df.to_dict('records') if not df.empty else [],
            page_size=10,
            style_table={'overflowX': 'auto'},
            style_cell={'textAlign': 'left', 'padding': '8px'},
            style_header={
                'backgroundColor': '#212121',
                'color': 'white',
                'fontWeight': 'bold'
            },
            style_data_conditional=[
                {
                    'if': {'row_index': 'odd'},
                    'backgroundColor': '#f9f9f9'
                }
            ],
            sort_action='native',
            filter_action='native',
        )
    ]),
    
    # 푸터
    html.Div([
        html.P(f"마지막 업데이트: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
               style={'textAlign': 'center', 'color': '#888', 'fontSize': '0.8em', 'marginTop': 50})
    ])
], style={'padding': '20px', 'fontFamily': 'Arial, sans-serif', 'maxWidth': '1200px', 'margin': '0 auto'})

# 콜백 함수 정의
@app.callback(
    [Output('views-trend-chart', 'figure'),
     Output('top-videos-chart', 'figure'),
     Output('scatter-chart', 'figure'),
     Output('video-data-table', 'data')],
    [Input('date-range', 'start_date'),
     Input('date-range', 'end_date')]
)
def update_dashboard(start_date, end_date):
    # 날짜 필터링
    filtered_df = df.copy()
    
    if start_date and end_date:
        start_date = pd.to_datetime(start_date)
        end_date = pd.to_datetime(end_date)
        filtered_df = filtered_df[(filtered_df['published_at'] >= start_date) & 
                                 (filtered_df['published_at'] <= end_date)]
    
    # 시간에 따른 조회수 추이 그래프
    views_trend_fig = px.line(
        filtered_df.sort_values('published_at'), 
        x='published_at', 
        y='view_count',
        labels={'published_at': '게시일', 'view_count': '조회수'},
        hover_data=['title', 'view_count'],
        template='plotly_white'
    )
    views_trend_fig.update_layout(hovermode='closest')
    
    # 인기 동영상 Top 10 그래프
    top_videos = filtered_df.nlargest(10, 'view_count')
    top_videos['short_title'] = top_videos['title'].apply(lambda x: x[:30] + '...' if len(x) > 30 else x)
    
    top_videos_fig = px.bar(
        top_videos,
        x='view_count',
        y='short_title',
        orientation='h',
        labels={'view_count': '조회수', 'short_title': ''},
        hover_data=['title', 'published_at', 'view_count'],
        template='plotly_white'
    )
    top_videos_fig.update_layout(yaxis={'categoryorder': 'total ascending'})
    
    # 조회수와 좋아요 수 산점도
    scatter_fig = px.scatter(
        filtered_df,
        x='view_count',
        y='like_count',
        labels={'view_count': '조회수', 'like_count': '좋아요 수'},
        hover_data=['title', 'published_at'],
        template='plotly_white',
        trendline='ols',
        trendline_color_override='red'
    )
    
    # 데이터 테이블 업데이트
    table_data = filtered_df.to_dict('records')
    
    return views_trend_fig, top_videos_fig, scatter_fig, table_data

# 실행
if __name__ == '__main__':
    app.run_server(debug=True)

대용량 데이터 처리 최적화 기법

유튜브 채널의 규모가 크거나 데이터 수집 기간이 길어지면 데이터의 양이 급격히 증가할 수 있다. 이런 경우 대용량 데이터를 효율적으로 처리하기 위한 최적화 기법이 필요하다. 여기서는 실제 프로덕션 환경에서 사용할 수 있는 성능 최적화 방법을 살펴본다.

대용량 데이터 처리를 위한 주요 최적화 기법은 다음과 같다:

  • 데이터 샘플링: 전체 데이터셋이 너무 클 경우, 통계적으로 유의미한 샘플을 추출하여 시각화
  • 데이터 집계: 원시 데이터 대신 집계된 데이터(예: 일별, 주별, 월별 평균)를 사용하여 차트 생성
  • 데이터베이스 활용: CSV나 파일 시스템 대신 SQLite, PostgreSQL 등의 데이터베이스 활용
  • 캐싱 도입: 자주 사용되는 쿼리 결과나 계산 결과를 메모리에 캐싱
  • 비동기 처리: 시각화와 데이터 처리를 비동기적으로 수행하여 응답성 향상
  • 병렬 처리: 멀티프로세싱이나 멀티스레딩을 활용하여 데이터 처리 속도 향상
📝 자주 묻는 질문: API 할당량 관리

YouTube Data API는 하루 단위로 할당량이 제한되어 있다. 기본적으로 무료 할당량은 10,000 유닛으로, 이는 쿼리 유형에 따라 다르게 소비된다. 예를 들어, videos.list 호출은 1개당 약 1~5 유닛을 소비한다. 할당량을 효율적으로 관리하는 방법은 다음과 같다:

  1. 요청 시 필요한 부분(part)만 명시하여 할당량 소비 최소화
  2. 데이터를 로컬에 캐싱하고 필요할 때만 API 호출
  3. 배치 요청을 활용하여 여러 ID를 한 번에 요청
  4. 정기적인 스케줄링으로 할당량 분산

할당량이 초과되면 24시간 동안 더 이상 API 요청을 할 수 없으므로, 프로덕션 환경에서는 반드시 적절한 예외 처리와 재시도 로직을 구현해야 한다.

다음은 대용량 유튜브 데이터를 효율적으로 처리하기 위한 코드 예제다. SQLite 데이터베이스를 사용하여 데이터를 저장하고, 필요한 경우에만 API를 호출하여 할당량을 절약하는 방식으로 구현되어 있다.

import os
import sqlite3
import pandas as pd
import numpy as np
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, timedelta
import time
import json
import concurrent.futures
from functools import lru_cache

class YouTubeDataManager:
    """
    대용량 유튜브 데이터를 효율적으로 관리하기 위한 클래스
    - SQLite 데이터베이스 활용
    - 캐싱 및 API 할당량 관리
    - 병렬 처리 지원
    """
    
    def __init__(self, api_key, db_path='youtube_data.db'):
        """
        초기화 함수
        
        Parameters:
        - api_key: YouTube Data API 키
        - db_path: SQLite 데이터베이스 파일 경로
        """
        self.api_key = api_key
        self.db_path = db_path
        self.conn = None
        self.cursor = None
        self.youtube = None
        
        # 데이터베이스 및 테이블 초기화
        self._init_db()
        
        # YouTube API 클라이언트 초기화
        self._init_api_client()
    
    def _init_db(self):
        """데이터베이스 연결 및 테이블 초기화"""
        try:
            self.conn = sqlite3.connect(self.db_path)
            self.cursor = self.conn.cursor()
            
            # 비디오 정보 테이블 생성
            self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS videos (
                video_id TEXT PRIMARY KEY,
                title TEXT,
                published_at TEXT,
                view_count INTEGER,
                like_count INTEGER,
                comment_count INTEGER,
                last_updated TEXT
            )
            ''')
            
            # 채널 정보 테이블 생성
            self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS channels (
                channel_id TEXT PRIMARY KEY,
                title TEXT,
                uploads_playlist_id TEXT,
                last_updated TEXT
            )
            ''')
            
            self.conn.commit()
        except sqlite3.Error as e:
            print(f"데이터베이스 초기화 오류: {e}")
    
    def _init_api_client(self):
        """YouTube API 클라이언트 초기화"""
        try:
            self.youtube = build('youtube', 'v3', developerKey=self.api_key)
        except Exception as e:
            print(f"YouTube API 클라이언트 초기화 오류: {e}")
    
    def close(self):
        """리소스 정리"""
        if self.conn:
            self.conn.close()
    
    def get_channel_info(self, channel_id):
        """
        채널 정보 가져오기 (데이터베이스 우선, 없으면 API 호출)
        
        Parameters:
        - channel_id: 유튜브 채널 ID
        
        Returns:
        - 채널 정보 딕셔너리 또는 None (오류 시)
        """
        # 데이터베이스에서 채널 정보 조회
        self.cursor.execute("SELECT * FROM channels WHERE channel_id = ?", (channel_id,))
        row = self.cursor.fetchone()
        
        # 데이터베이스에 정보가 있고, 24시간 이내에 업데이트된 경우 캐시된 정보 반환
        if row and (datetime.now() - datetime.fromisoformat(row[3])).days < 1:
            return {
                'channel_id': row[0],
                'title': row[1],
                'uploads_playlist_id': row[2],
                'last_updated': row[3]
            }
        
        # 데이터베이스에 없거나 오래된 정보인 경우 API 호출
        try:
            response = self.youtube.channels().list(
                part='snippet,contentDetails',
                id=channel_id
            ).execute()
            
            if 'items' not in response or not response['items']:
                print(f"채널 ID에 해당하는 정보를 찾을 수 없음: {channel_id}")
                return None
            
            channel_info = {
                'channel_id': channel_id,
                'title': response['items'][0]['snippet']['title'],
                'uploads_playlist_id': response['items'][0]['contentDetails']['relatedPlaylists']['uploads'],
                'last_updated': datetime.now().isoformat()
            }
            
            # 데이터베이스에 저장 또는 업데이트
            self.cursor.execute('''
            INSERT OR REPLACE INTO channels (channel_id, title, uploads_playlist_id, last_updated)
            VALUES (?, ?, ?, ?)
            ''', (
                channel_info['channel_id'],
                channel_info['title'],
                channel_info['uploads_playlist_id'],
                channel_info['last_updated']
            ))
            self.conn.commit()
            
            return channel_info
        
        except HttpError as e:
            print(f"YouTube API 오류: {e}")
            return None
        except Exception as e:
            print(f"채널 정보 가져오기 오류: {e}")
            return None
    
    def get_videos_for_channel(self, channel_id, max_results=50, update_threshold_hours=24):
        """
        채널의 비디오 목록 가져오기
        - 캐싱 활용: 최근에 업데이트된 데이터는 API 호출 없이 DB에서 가져옴
        - 필요한 경우에만 API 호출하여 할당량 절약
        
        Parameters:
        - channel_id: 유튜브 채널 ID
        - max_results: 최대 결과 수 (API 호출 시)
        - update_threshold_hours: 업데이트 임계값 (시간 단위, 이 시간 이내에 업데이트된 데이터는 재사용)
        
        Returns:
        - 비디오 정보가 담긴 DataFrame
        """
        # 채널 정보 가져오기
        channel_info = self.get_channel_info(channel_id)
        if not channel_info:
            return pd.DataFrame()
        
        # 업로드 재생목록 ID
        uploads_playlist_id = channel_info['uploads_playlist_id']
        
        # 데이터베이스에서 최근 업데이트 시간 확인
        self.cursor.execute("""
        SELECT MAX(last_updated) FROM videos
        WHERE video_id IN (
            SELECT video_id FROM videos
            WHERE video_id IN (
                SELECT video_id FROM playlist_videos
                WHERE playlist_id = ?
            )
        )
        """, (uploads_playlist_id,))
        
        last_updated_str = self.cursor.fetchone()[0]
        needs_update = True
        
        if last_updated_str:
            last_updated = datetime.fromisoformat(last_updated_str)
            hours_since_update = (datetime.now() - last_updated).total_seconds() / 3600
            if hours_since_update < update_threshold_hours:
                needs_update = False
        
        # 업데이트가 필요한 경우 API 호출
        if needs_update:
            try:
                # 재생목록에서 비디오 ID 가져오기
                video_ids = self._get_video_ids_from_playlist(uploads_playlist_id, max_results)
                
                # 비디오 세부 정보 가져오기 (병렬 처리)
                self._update_videos_info(video_ids)
            
            except Exception as e:
                print(f"비디오 정보 업데이트 오류: {e}")
        
        # 데이터베이스에서 최신 데이터 가져오기
        df = self._get_videos_from_db(channel_id)
        return df
    
    def _get_video_ids_from_playlist(self, playlist_id, max_results=50):
        """재생목록에서 비디오 ID 목록 가져오기"""
        video_ids = []
        next_page_token = None
        
        while True:
            try:
                # API 호출 빈도 제한
                time.sleep(0.1)
                
                response = self.youtube.playlistItems().list(
                    part='snippet',
                    playlistId=playlist_id,
                    maxResults=min(50, max_results),  # 최대 50개씩 요청 가능
                    pageToken=next_page_token
                ).execute()
                
                # 비디오 ID 추출
                for item in response['items']:
                    video_ids.append(item['snippet']['resourceId']['videoId'])
                
                # 다음 페이지 확인
                next_page_token = response.get('nextPageToken')
                
                # 최대 결과 수에 도달하거나 다음 페이지가 없으면 종료
                if not next_page_token or len(video_ids) >= max_results:
                    break
            
            except HttpError as e:
                print(f"재생목록 항목 가져오기 오류: {e}")
                break
        
        return video_ids[:max_results]
    
    def _update_videos_info(self, video_ids, batch_size=50):
        """비디오 세부 정보 가져와서 데이터베이스에 저장 (배치 처리)"""
        # 비디오 ID 목록을 배치로 분할
        batches = [video_ids[i:i+batch_size] for i in range(0, len(video_ids), batch_size)]
        
        for batch in batches:
            try:
                # API 호출 빈도 제한
                time.sleep(0.2)
                
                response = self.youtube.videos().list(
                    part='snippet,statistics',
                    id=','.join(batch)
                ).execute()
                
                current_time = datetime.now().isoformat()
                
                # 데이터베이스에 저장
                for item in response.get('items', []):
                    video_id = item['id']
                    
                    # 값이 없는 경우 기본값 설정
                    stats = item.get('statistics', {})
                    view_count = int(stats.get('viewCount', 0))
                    like_count = int(stats.get('likeCount', 0))
                    comment_count = int(stats.get('commentCount', 0))
                    
                    self.cursor.execute('''
                    INSERT OR REPLACE INTO videos 
                    (video_id, title, published_at, view_count, like_count, comment_count, last_updated)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                    ''', (
                        video_id,
                        item['snippet']['title'],
                        item['snippet']['publishedAt'],
                        view_count,
                        like_count,
                        comment_count,
                        current_time
                    ))
                
                self.conn.commit()
            
            except HttpError as e:
                print(f"비디오 정보 가져오기 오류: {e}")
            except Exception as e:
                print(f"비디오 정보 처리 오류: {e}")
    
    def _get_videos_from_db(self, channel_id=None):
        """데이터베이스에서 비디오 정보 가져오기"""
        query = """
        SELECT v.video_id, v.title, v.published_at, v.view_count, v.like_count, v.comment_count
        FROM videos v
        """
        
        if channel_id:
            query += """
            JOIN playlist_videos pv ON v.video_id = pv.video_id
            JOIN channels c ON pv.playlist_id = c.uploads_playlist_id
            WHERE c.channel_id = ?
            """
            self.cursor.execute(query, (channel_id,))
        else:
            self.cursor.execute(query)
        
        rows = self.cursor.fetchall()
        
        # DataFrame으로 변환
        df = pd.DataFrame(rows, columns=[
            'video_id', 'title', 'published_at', 'view_count', 'like_count', 'comment_count'
        ])
        
        if not df.empty:
            # 날짜 형식 변환
            df['published_at'] = pd.to_datetime(df['published_at'])
            
            # 데이터 타입 변환
            numeric_cols = ['view_count', 'like_count', 'comment_count']
            df[numeric_cols] = df[numeric_cols].astype(int)
        
        return df
    
    # 데이터 집계 메서드들 (성능 최적화)
    
    @lru_cache(maxsize=32)  # 결과 캐싱
    def get_monthly_stats(self, channel_id):
        """월별 집계 통계 계산 (캐싱 적용)"""
        df = self.get_videos_for_channel(channel_id)
        
        if df.empty:
            return pd.DataFrame()
        
        # 월별 데이터 집계
        df['year_month'] = df['published_at'].dt.strftime('%Y-%m')
        monthly_stats = df.groupby('year_month').agg({
            'video_id': 'count',
            'view_count': ['sum', 'mean'],
            'like_count': ['sum', 'mean']
        }).reset_index()
        
        # 다중 인덱스 컬럼 이름 변경
        monthly_stats.columns = [
            ''.join(col).strip() if col[1] else col[0] 
            for col in monthly_stats.columns.values
        ]
        
        return monthly_stats
    
    def get_top_videos(self, channel_id, n=10, by='view_count'):
        """인기 비디오 Top N 가져오기"""
        df = self.get_videos_for_channel(channel_id)
        
        if df.empty:
            return pd.DataFrame()
        
        # 정렬 및 상위 N개 선택
        return df.nlargest(n, by)[['video_id', 'title', 'published_at', 'view_count', 'like_count', 'comment_count']]
    
    # 병렬 처리 메서드
    
    def process_multiple_channels(self, channel_ids, max_workers=4):
        """여러 채널 동시에 처리 (병렬 처리)"""
        results = {}
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 채널별로 작업 제출
            future_to_channel = {
                executor.submit(self.get_videos_for_channel, channel_id): channel_id 
                for channel_id in channel_ids
            }
            
            # 결과 수집
            for future in concurrent.futures.as_completed(future_to_channel):
                channel_id = future_to_channel[future]
                try:
                    data = future.result()
                    results[channel_id] = data
                except Exception as e:
                    print(f"{channel_id} 처리 중 오류 발생: {e}")
        
        return results


# 사용 예시

def main():
    API_KEY = "YOUR_API_KEY"  # 실제 API 키로 변경 필요
    CHANNEL_ID = "CHANNEL_ID"  # 분석할 채널 ID로 변경 필요
    
    try:
        # 데이터 매니저 초기화
        manager = YouTubeDataManager(API_KEY)
        
        # 채널 비디오 데이터 가져오기
        videos_df = manager.get_videos_for_channel(CHANNEL_ID)
        
        if not videos_df.empty:
            print(f"총 {len(videos_df)}개 비디오를 가져왔습니다.")
            print("\n조회수 상위 5개 비디오:")
            print(videos_df.nlargest(5, 'view_count')[['title', 'view_count']])
            
            # 월별 통계 계산
            monthly_stats = manager.get_monthly_stats(CHANNEL_ID)
            print("\n월별 업로드 및 조회수 통계:")
            print(monthly_stats)
        else:
            print("비디오 데이터를 가져오지 못했습니다.")
        
        # 리소스 정리
        manager.close()
    
    except Exception as e:
        print(f"오류 발생: {e}")

if __name__ == "__main__":
    main()

위 코드는 대용량 유튜브 데이터를 효율적으로 처리하기 위한 다양한 것들이 있다...

  1. SQLite 데이터베이스를 사용하여 데이터 영속성 확보
  2. 캐싱을 통한 불필요한 API 호출 최소화
  3. 배치 처리로 API 요청 효율화
  4. 병렬 처리를 통한 다중 채널 동시 분석
  5. 데이터 집계로 시각화 성능 최적화

이러한 최적화 기법들은 데이터의 규모가 커질수록 더 큰 효과를 발휘한다. 특히 유튜브 API 할당량 제한이 있는 상황에서는 효율적인 데이터 관리가 필수적이다.

마무리

이 글에서는 파이썬을 활용하여 유튜브 조회수 데이터를 시각화하는 방법을 단계별로 살펴봤다. 유튜브 API를 연동하는 방법부터 데이터 정제, 기본 시각화, 인터랙티브 시각화, 대시보드 구축, 그리고 대용량 데이터 처리에 이르기까지 실무에서 활용할 수 있는 다양한 기법을 다뤘다... 감사합니다.

Designed by JB FACTORY