Backend Development

파이썬 GIL과 멀티스레딩: 언제 멀티스레드가 효과적일까?

Kun Woo Kim 2025. 7. 6. 00:15
728x90

파이썬 개발자라면 한 번쯤은 들어봤을 GIL(Global Interpreter Lock). 특히 취업 면접에서 자주 등장하는 단골 질문 중 하나입니다. "GIL 때문에 파이썬에서는 멀티스레딩이 의미가 없다"는 말을 들어본 적이 있으실 텐데요, 과연 이 말이 100% 맞을까요?

오늘은 파이썬의 GIL에 대한 오해를 풀고, 멀티스레딩이 실제로 효과적인 상황들을 알아보겠습니다.


GIL이란 무엇인가?

기본 개념

GIL(Global Interpreter Lock)은 파이썬 인터프리터에서 한 번에 하나의 스레드만 파이썬 바이트코드를 실행할 수 있도록 제한하는 뮤텍스(mutex)입니다.

import threading
import time

def cpu_bound_task():
    # CPU 집약적인 작업 시뮬레이션
    total = 0
    for i in range(10000000):
        total += i * i
    return total

# 멀티스레딩으로 실행해도 성능 향상이 미미함
start_time = time.time()
threads = []
for i in range(4):
    thread = threading.Thread(target=cpu_bound_task)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"멀티스레딩 실행 시간: {time.time() - start_time:.2f}초")

왜 GIL이 존재할까?

GIL이 존재하는 이유를 음식점에 비유해보겠습니다:

  • 음식점(파이썬 인터프리터): 하나의 주방에서 요리를 만듦
  • 요리사(스레드): 여러 명의 요리사가 있지만
  • 주방장(GIL): 한 번에 한 명의 요리사만 주방을 사용할 수 있도록 통제

이런 제약이 있는 이유는:

  1. 메모리 관리의 안전성: 레퍼런스 카운팅 방식의 가비지 컬렉션 보호
  2. C 확장 모듈의 호환성: 스레드 안전하지 않은 C 라이브러리들과의 호환
  3. 구현의 단순성: 복잡한 락 메커니즘 없이도 스레드 안전성 보장

일반적인 오해: "멀티스레딩은 무조건 무의미하다?"

CPU Bound vs I/O Bound

많은 개발자들이 "GIL 때문에 파이썬에서는 멀티스레딩이 효과가 없다"고 생각합니다. 하지만 이는 작업의 성격을 고려하지 않은 판단입니다.

작업 유형 GIL 해제 여부 멀티스레딩 효과
CPU Bound 해제되지 않음 ❌ 효과 없음
I/O Bound ✅ 해제됨 ✅ 효과 있음

CPU Bound 작업의 한계

import time
import threading
from concurrent.futures import ThreadPoolExecutor

def cpu_intensive_task(n):
    """CPU 집약적인 작업"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def test_cpu_bound():
    start_time = time.time()

    # 순차 실행
    sequential_start = time.time()
    for _ in range(4):
        cpu_intensive_task(1000000)
    sequential_time = time.time() - sequential_start

    # 멀티스레딩 실행
    threading_start = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
        for future in futures:
            future.result()
    threading_time = time.time() - threading_start

    print(f"순차 실행: {sequential_time:.2f}초")
    print(f"멀티스레딩: {threading_time:.2f}초")
    print(f"성능 비율: {sequential_time/threading_time:.2f}배")

test_cpu_bound()
# 결과: 멀티스레딩이 오히려 더 느릴 수 있음

멀티스레딩이 효과적인 경우: I/O Bound 작업

GIL 해제 시점

중요한 점은 I/O 작업 중에는 GIL이 해제된다는 것입니다. 다음과 같은 작업들에서 GIL이 해제됩니다:

  • 파일 읽기/쓰기
  • 네트워크 요청
  • 데이터베이스 쿼리
  • sleep() 함수 호출

실제 성능 비교 예제

데이터 전처리 작업을 통해 실제 성능 차이를 확인해보겠습니다:

import time
import threading
import os
from concurrent.futures import ThreadPoolExecutor

def unit_task(task_id):
    """하나의 단위 작업: 계산 + I/O"""
    # 1. CPU 작업 시뮬레이션
    result = 0
    for i in range(100000):
        result += i * i

    # 2. I/O 작업: 10MB 파일 쓰기
    data = b'x' * (10 * 1024 * 1024)  # 10MB 데이터
    filename = f'temp_file_{task_id}.dat'

    with open(filename, 'wb') as f:
        f.write(data)

    # 파일 정리
    os.remove(filename)

    return result

def test_performance():
    num_tasks = 20

    # 1. 순차 실행
    print("순차 실행 시작...")
    start_time = time.time()
    for i in range(num_tasks):
        unit_task(i)
    sequential_time = time.time() - start_time

    # 2. 멀티스레딩 실행
    print("멀티스레딩 실행 시작...")
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(unit_task, i) for i in range(num_tasks)]
        for future in futures:
            future.result()
    threading_time = time.time() - start_time

    print(f"\n=== 성능 비교 결과 ===")
    print(f"순차 실행: {sequential_time:.2f}초")
    print(f"멀티스레딩: {threading_time:.2f}초")
    print(f"성능 향상: {sequential_time/threading_time:.2f}배")

# 실행
test_performance()

실행 결과 분석

순차 실행 시작...
멀티스레딩 실행 시작...

=== 성능 비교 결과 ===
순차 실행: 15.43초
멀티스레딩: 8.92초
성능 향상: 1.73배

왜 이런 결과가 나올까요?

  1. I/O 대기 시간 활용: 한 스레드가 파일을 쓰는 동안 다른 스레드가 CPU 작업 수행
  2. GIL 해제: 파일 I/O 중에는 GIL이 해제되어 병렬 처리 가능
  3. 전체 처리량 증가: I/O 대기 시간을 CPU 작업으로 효율적으로 활용

실무에서의 활용 사례

1. 웹 크롤링

import requests
import time
from concurrent.futures import ThreadPoolExecutor

def fetch_url(url):
    """단일 URL에서 데이터 가져오기"""
    try:
        response = requests.get(url, timeout=5)
        return f"{url}: {response.status_code}"
    except Exception as e:
        return f"{url}: Error - {str(e)}"

def crawl_websites():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/3',
        'https://httpbin.org/delay/1',
    ]

    # 순차 실행
    start_time = time.time()
    sequential_results = [fetch_url(url) for url in urls]
    sequential_time = time.time() - start_time

    # 멀티스레딩 실행
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        threading_results = list(executor.map(fetch_url, urls))
    threading_time = time.time() - start_time

    print(f"순차 실행: {sequential_time:.2f}초")
    print(f"멀티스레딩: {threading_time:.2f}초")
    print(f"성능 향상: {sequential_time/threading_time:.2f}배")

crawl_websites()

2. 데이터베이스 배치 처리

import sqlite3
import threading
from concurrent.futures import ThreadPoolExecutor

def create_sample_database():
    """샘플 데이터베이스 생성"""
    conn = sqlite3.connect('sample.db')
    cursor = conn.cursor()

    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    ''')

    # 샘플 데이터 삽입
    for i in range(1000):
        cursor.execute(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            (f"User{i}", f"user{i}@example.com")
        )

    conn.commit()
    conn.close()

def process_user_batch(start_id, end_id):
    """사용자 배치 처리"""
    conn = sqlite3.connect('sample.db')
    cursor = conn.cursor()

    # 데이터 조회 (I/O 작업)
    cursor.execute(
        "SELECT * FROM users WHERE id BETWEEN ? AND ?",
        (start_id, end_id)
    )
    users = cursor.fetchall()

    # 데이터 처리 (CPU 작업)
    processed_users = []
    for user in users:
        # 간단한 데이터 변환
        processed_user = (user[0], user[1].upper(), user[2].lower())
        processed_users.append(processed_user)

    conn.close()
    return len(processed_users)

def batch_processing_comparison():
    create_sample_database()

    batch_size = 100
    total_users = 1000
    batches = [(i, min(i + batch_size - 1, total_users)) 
               for i in range(1, total_users + 1, batch_size)]

    # 순차 처리
    start_time = time.time()
    sequential_count = sum(process_user_batch(start, end) for start, end in batches)
    sequential_time = time.time() - start_time

    # 멀티스레딩 처리
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        threading_count = sum(executor.map(lambda batch: process_user_batch(*batch), batches))
    threading_time = time.time() - start_time

    print(f"처리된 사용자 수: {sequential_count}")
    print(f"순차 처리: {sequential_time:.2f}초")
    print(f"멀티스레딩: {threading_time:.2f}초")
    print(f"성능 향상: {sequential_time/threading_time:.2f}배")

언제 무엇을 선택해야 할까?

작업 유형별 권장사항

# 판단 기준을 위한 간단한 가이드

def choose_concurrency_strategy(task_type, io_ratio):
    """
    동시성 전략 선택 가이드

    Args:
        task_type: 'cpu_bound' 또는 'io_bound'
        io_ratio: I/O 작업이 차지하는 비율 (0.0 ~ 1.0)
    """
    if task_type == 'cpu_bound':
        if io_ratio < 0.1:
            return "멀티프로세싱 사용 권장"
        elif io_ratio < 0.3:
            return "멀티프로세싱 또는 asyncio 고려"
        else:
            return "멀티스레딩도 효과적일 수 있음"

    elif task_type == 'io_bound':
        if io_ratio > 0.7:
            return "멀티스레딩 또는 asyncio 적극 권장"
        elif io_ratio > 0.3:
            return "멀티스레딩 권장"
        else:
            return "순차 처리도 괜찮음"

    return "작업 특성을 더 자세히 분석 필요"

# 사용 예시
print(choose_concurrency_strategy('io_bound', 0.8))
# 출력: 멀티스레딩 또는 asyncio 적극 권장

성능 측정 템플릿

import time
import functools

def performance_timer(func):
    """성능 측정 데코레이터"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()

        print(f"{func.__name__} 실행 시간: {end_time - start_time:.2f}초")
        return result
    return wrapper

# 사용 예시
@performance_timer
def my_task():
    # 여기에 측정하고 싶은 작업 코드
    time.sleep(1)

my_task()  # my_task 실행 시간: 1.00초

주의사항과 베스트 프랙티스

1. 스레드 풀 크기 최적화

import os
from concurrent.futures import ThreadPoolExecutor

def optimal_thread_count():
    """최적의 스레드 수 계산"""
    cpu_count = os.cpu_count()

    # I/O bound 작업의 경우
    io_bound_threads = min(32, (cpu_count or 1) + 4)

    # CPU bound 작업의 경우
    cpu_bound_threads = cpu_count or 1

    return {
        'io_bound': io_bound_threads,
        'cpu_bound': cpu_bound_threads
    }

print(f"권장 스레드 수: {optimal_thread_count()}")

2. 스레드 안전성 고려

import threading

# ❌ 스레드 안전하지 않은 코드
shared_counter = 0

def unsafe_increment():
    global shared_counter
    for _ in range(100000):
        shared_counter += 1  # Race condition 발생 가능

# ✅ 스레드 안전한 코드
safe_counter = 0
counter_lock = threading.Lock()

def safe_increment():
    global safe_counter
    for _ in range(100000):
        with counter_lock:
            safe_counter += 1

3. 예외 처리

from concurrent.futures import ThreadPoolExecutor, as_completed

def safe_task_execution(tasks):
    """안전한 작업 실행 with 예외 처리"""
    results = []
    errors = []

    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_task = {executor.submit(task): task for task in tasks}

        for future in as_completed(future_to_task):
            task = future_to_task[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as exc:
                errors.append(f'{task} generated an exception: {exc}')

    return results, errors

멀티프로세싱과의 비교

언제 멀티프로세싱을 사용해야 할까?

import multiprocessing
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy_task(n):
    """CPU 집약적인 작업"""
    return sum(i * i for i in range(n))

def compare_approaches():
    tasks = [1000000] * 4

    # 멀티스레딩
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(cpu_heavy_task, tasks))
    thread_time = time.time() - start_time

    # 멀티프로세싱
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        process_results = list(executor.map(cpu_heavy_task, tasks))
    process_time = time.time() - start_time

    print(f"멀티스레딩: {thread_time:.2f}초")
    print(f"멀티프로세싱: {process_time:.2f}초")
    print(f"멀티프로세싱이 {thread_time/process_time:.2f}배 빠름")

compare_approaches()

실무 체크리스트

멀티스레딩 적용 전 확인사항

  • 작업 특성 분석: I/O 대기 시간이 전체의 30% 이상인가?
  • 데이터 의존성: 작업들이 서로 독립적인가?
  • 스레드 안전성: 공유 자원에 대한 동기화가 적절한가?
  • 성능 측정: 실제 성능 향상이 있는가?
  • 리소스 관리: 스레드 풀 크기가 적절한가?

디버깅 팁

import threading
import logging

# 스레드별 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(message)s'
)

def debug_task(task_id):
    """디버깅이 용이한 작업 함수"""
    thread_name = threading.current_thread().name
    logging.info(f"작업 {task_id} 시작 (스레드: {thread_name})")

    # 실제 작업 수행
    time.sleep(1)

    logging.info(f"작업 {task_id} 완료 (스레드: {thread_name})")
    return f"작업 {task_id} 결과"

결론

파이썬의 GIL은 확실히 멀티스레딩의 성능을 제한하지만, 이것이 곧 "멀티스레딩이 무의미하다"는 것을 의미하지는 않습니다.

핵심 인사이트

  1. 작업의 성격이 중요: CPU bound vs I/O bound 작업을 구분하여 접근
  2. GIL 해제 시점 활용: 파일 I/O, 네트워크 요청 등에서 멀티스레딩의 이점 극대화
  3. 적절한 도구 선택: 상황에 따라 멀티스레딩, 멀티프로세싱, asyncio 중 선택
  4. 성능 측정의 중요성: 실제 환경에서의 성능 측정을 통한 검증 필수

실무 적용 가이드

  • 데이터 전처리: 파일 I/O가 많은 경우 멀티스레딩 고려
  • 웹 크롤링: 네트워크 요청이 주된 작업인 경우 멀티스레딩 적극 활용
  • API 서버: I/O bound 작업이 많은 웹 서버에서 효과적
  • 배치 처리: 데이터베이스 I/O와 결합된 작업에서 성능 향상 기대

이제 여러분도 면접에서 "GIL 때문에 멀티스레딩이 효과가 없다"는 질문에 대해 보다 정확하고 깊이 있는 답변을 할 수 있을 것입니다. 무엇보다 실제 프로젝트에서 적절한 동시성 전략을 선택하는 데 도움이 되길 바랍니다.

728x90