반응형

이번에는 futures에 대해서 알아보겠습니다.

 

futures는 요청들을 비동기적으로 수행하기 위한 인터페이스를 제공해 주는 모듈입니다. 

 

 

Executor

concurrent.futures.Excutor 는 비동기 요청을 수행할 때 필요한 메써드들을 정의해 둔 추상 클래스 입니다. 여기에는 submit, map, shutdown 메써드들이 정의되어 있습니다.

 

submit 수행될 요청을 제출하는 메써드입니다. 결과로 (뒤에서 설명할) Future 인스턴스를 반환합니다.
map 같은 function에 list형태로 정의된 매개변수들을 각각 전달해서 수행하는 경우 사용합니다.
shutdown executor에게 할당된 리소스를 정리하라는 signal을 보냅니다.
shutdown이 호출된 executor에게 submit 혹은 map 를 호출하면, RuntimeError가 발생합니다.

참고: shutdown 을 호출했다고 해서, 수행 중인 모든 동작이 바로 멈추는 것은 아닙니다. 다만 cancel_futures 매개변수 값에 따라서, 아직 시작되지 않고 대기중인 요청을 바로 취소할 건지, 아니면 대기중인 요청까지 모두 끝나고 나서 리소스를 정리할지가 결정됩니다.

 

 

python (버전 3.12.4 기준)에서 기본으로 제공되는 executor에는 ThreadPoolExecutor와 ProcessPoolExecutor 가 있습니다. 이름에서 유추할 수 있듯이, ThreadPoolExecutor는 요청을 수행할 때, ThreadPool을 사용하고, ProcessPoolExecutor는 요청을 수행할 때, ProcessPool을 사용합니다.

Process 와 Thread의 차이를 간략하게 정리하면, Process는 독립적으로 수행되며, 메모리 등의 리소스를 따로 할당해서 수행하게 됩니다. Thread는 Process 내에서 수행이 되며, 하나의 Process 안에서 수행되는 Thread들 간에는 리소스를 공유하게 됩니다. 

따라서 ProcessPoolExecutor 의 경우, 요청들이 Process 단위로 수행되기 때문에, 수행시에 Process에게 리소스를 따로 할당되어야 하기 때문에, overhead가 발생됩니다. 반면 ThreadPoolExecutor의 경우, 리소스가 공유되기 때문에 리소스 할당에 따른 overhead는 없지만, 잘못 사용하게 되면 deadlock이 발생할 수 있습니다.

결론을 이야기하면, 요청 하나 하나의 수행 작업이 오래걸리는 경우 (CPU-bound tasks)에는 리소스 할당에 시간이 소요되더라도 ProcessPoolExecutor가 적합하고, 처리할 데이터가 많지만, 하나 하나의 수행 작업은 빠르게 처리되는 경우 (I/O bound tasks)에는 ThreadPoolExecutor를 사용하는 것이 좋습니다.

 

 

Future

executor에게 요청을 submit하면, Future 인스턴스를 반환해 줍니다. 이 Future 인스턴스를 통해서, executor에게 전달한 요청의 상태를 확인할 수 있습니다. 다음은 Future 객체에서 사용할 수 있는 method 들 입니다. (일부만 발췌했으며 전체 리스트를 확인하고 싶으시면 다음의 링크를 참하세요. https://docs.python.org/3/library/concurrent.futures.html)

 

canceled 수행될 요청이 취소되었는지에 대한 결과를 돌려줍니다.
running 요청이 지금 현재 수행되고 있는지 상태를 돌려줍니다.
done 요청이 취소되었거나 완료되었으면 True를 돌려줍니다. 
cancel 아직 대기 중인 요청에 한해서 요청을 취소합니다.
result 요청 수행에 대한 결과를 돌려줍니다. 만약 요청이 아직 끝나지 않았다면, 결과가 나올때까지 기다립니다. (block)

 

 

Module functions

concurrent.futures에 정의되어 있는 futures 에서 사용하는 funciton들 입니다. future 요청은 한 가지 요청만 비동기로 처리하는 경우도 있지만, 보통의 경우 여러 요청을 동시에 처리하기 위해서 많이 사용되기 때문에, 전체 요청들에 대한 결과를 받아서 처리해야 하는 경우, 아래의 function들을 사용하게 됩니다.

wait Future 인스턴스들의 요청이 끝날 때까지 기다립니다. timeout을 지정할 수도 있고, return_when argument를 이용해서 첫 번째 요청이 끝날때, 처음 exception 이 발생했을 때, 모든 요청이 다 처리되었을 때 등의 옵션을 지정할 수 있습니다.
as_completed Future 인스턴스들의 iterator를 반환해 줍니다. iterator는 generator처럼 동작합니다. 완료된 요청 순으로 결과가 나온 Futuer 인스턴스를 yield 해 줍니다. (비동기적으로 결과를 반환해 줍니다.) 

 

 

 

Futures 사용 예제

아래 코드는 간단히 작성한 사용 예제 입니다. (docs.python.org 에 있는 예시를 활용하였습니다.)

 

 

먼저 map을 이용해서 요청을 처리하는 예시입니다. 

import concurrent.futures
import urllib.request

TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']

def load_url(url):
    try:
        with urllib.request.urlopen(url) as conn:
            return url, conn.read(), None
    except Exception as exc:
        return url, None, str(exc)

with concurrent.futures.ThreadPoolExecutor() as executor:
    for url, data, err in executor.map(load_url, TEST_URLS, timeout=60):
        if data:
            print('%r page is %d bytes' % (url, len(data)))
        else:
            print('%r generated an exception: %s' % (url, err))

 

 

map은 as_completed 와 비슷한데, 결과 iterator에는 Future 인스턴스가 아닌, 인스턴스의 result 들을 반환해 줍니다. map을 억지로 풀어쓴다면 다음과 같습니다.

with concurrent.futures.ThreadPoolExecutor() as executor:
    for url, data, err in [f_instance.result() for f_instance in 
        concurrent.futures.as_completed([executor.submit(load_url, url) for url in TEST_URLS])]:
        if data:
            print('%r page is %d bytes' % (url, len(data)))
        else:
            print('%r generated an exception: %s' % (url, err))

 

 

여기서 이야기하고자 하는 바는, map의 경우에는 Future 인스턴스에서 result를 읽는 과정이 포함되어 있기 때문에, 특정요청에서 exception 이 발생하는 경우, iterator를 수행하는 과정에서 exception이 발생하게 됩니다. 이 exception으로 문제가 생기는 것을 막기 위해서는, 요청 내에서 exception이 발생하더라도 같은 포맷으로 결과를 리턴해 줄 수 있게 디자인해야 합니다.

 

위의 예시를 as_completed 를 사용해서 변경하면 다음과 같습니다.

import concurrent.futures
import urllib.request

TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with concurrent.futures.ThreadPoolExecutor() as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in TEST_URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

 

 

여기서는 as_completed가 future instance 에서 결과를 받아오는 시점에서 exception을 처리할 수 있기 때문에 요청에서 Exception 이 발생한다고 해도, iterator에서 처리가 가능합니다.

 

 

마지막으로 wait을 사용하면, list에 future instance를 호출한 순서대로 저장하여, list를 이용해서 호출한 순서대로 결과를 처리할 수 있습니다. 참고로 wait function은 반환값으로 as_completed와 같이 처리된 순서대로 결과를 처리할 수 있는 generator 형태의 iterator를 돌려줍니다.

 

import concurrent.futures
import urllib.request

TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with concurrent.futures.ThreadPoolExecutor() as executor:
    future_list = [executor.submit(load_url, url, 60) for url in TEST_URLS]
    concurrent.futures.wait(future_list)
    for idx, future in enumerate(future_list):
        url = TEST_URLS[idx]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

 

 

반응형

+ Recent posts