본문 바로가기
data science/python

threading / multiprocessing / asyncio

by 꼰대코더 2024. 5. 2.

wikipedia에서 랜덤 100 페이지의 타이틀을 화일로 저장하는 샘플로서 multiprocessing과 async 를 이용하여 처리속도의 향상을 보여주고 있다.

  • multiprcessing : 계산등의 CPU 의존적인 처리에 유리 
  • async : 화일 I/O, 네트워크 I/O 등의 Blocking처리에 유리
  • 위의 둘을 이용하여 처리속도를 향상
import asyncio                                         # Gives us async/await
import concurrent.futures                        # Allows creating new processes
import time
from math import floor                              # Helps divide up our requests evenly across our CPU cores
from multiprocessing import cpu_count    # Returns our number of CPU cores

import aiofiles                                            # For asynchronously performing file I/O operations
import aiohttp                                            # For asynchronously making HTTP requests
from bs4 import BeautifulSoup                  # For easy webpage scraping

 

num_pages 만큼 wikipedia 페이지의 h1 타이틀을 output_file 에 append

async def get_and_scrape_pages(num_pages: int, output_file: str):
    async with \
    aiohttp.ClientSession() as client, \
    aiofiles.open(output_file, "a+", encoding="utf-8") as f:

        for _ in range(num_pages):
            async with client.get("https://en.wikipedia.org/wiki/Special:Random") as response:
                if response.status > 399:
                    # I was getting a 429 Too Many Requests at a higher volume of requests
                    response.raise_for_status()

                page = await response.text()
                soup = BeautifulSoup(page, features="html.parser")
                title = soup.find("h1").text

                await f.write(title + "\t")

        await f.write("\n")

 

asyncio.run 을 이용하여 i/o blocking중에 cpu에 다른 태스크를 양보

def start_scraping(num_pages: int, output_file: str, i: int):
    print(f"Process {i} starting...")
    asyncio.run(get_and_scrape_pages(num_pages, output_file))
    print(f"Process {i} finished.")

 

100 개를 cpu 수에 할당. 맨마지막은 (할당된 수 + 나머지 여분수)를 담당

def main():
    NUM_PAGES = 100 # Number of pages to scrape altogether
    NUM_CORES = cpu_count() # Our number of CPU cores (including logical cores)
    OUTPUT_FILE = "./wiki_titles.tsv" # File to append our scraped titles to

    PAGES_PER_CORE = floor(NUM_PAGES / NUM_CORES)
    PAGES_FOR_FINAL_CORE = PAGES_PER_CORE + NUM_PAGES % PAGES_PER_CORE # For our final core

 

Process Pool를 만들어 동시 처리 (or ThreadPoolExecutor)

    futures = []

    with concurrent.futures.ProcessPoolExecutor(NUM_CORES) as executor:
        for i in range(NUM_CORES - 1):
            new_future = executor.submit(
                start_scraping, # Function to perform
                # v Arguments v
                num_pages=PAGES_PER_CORE,
                output_file=OUTPUT_FILE,
                i=i
            )
            futures.append(new_future)

        futures.append(
            executor.submit(
                start_scraping,
                PAGES_FOR_FINAL_CORE, OUTPUT_FILE, NUM_CORES-1
            )
        )

    concurrent.futures.wait(futures)

 

'data science > python' 카테고리의 다른 글

Redis Pub/Sub  (1) 2024.09.09
Thread vs ThreadPool vs ThreadPoolExecutor  (0) 2024.05.11
image byte 데이터 <-> numpy string  (0) 2024.04.29
two list -> dict  (0) 2024.02.27
문자열 리스트 조작  (0) 2024.02.02