이번에는 futures에 대해서 알아보겠습니다.
futures는 요청들을 비동기적으로 수행하기 위한 인터페이스를 제공해 주는 모듈입니다.
Executor
concurrent.futures.Excutor 는 비동기 요청을 수행할 때 필요한 메써드들을 정의해 둔 추상 클래스 입니다. 여기에는 submit, map, shutdown 메써드들이 정의되어 있습니다.
submit | 수행될 요청을 제출하는 메써드입니다. 결과로 (뒤에서 설명할) Future 인스턴스를 반환합니다. |
map | 같은 function에 list형태로 정의된 매개변수들을 각각 전달해서 수행하는 경우 사용합니다. |
shutdown | executor에게 할당된 리소스를 정리하라는 signal을 보냅니다. shutdown이 호출된 executor에게 submit 혹은 map 를 호출하면, RuntimeError가 발생합니다. 참고: shutdown 을 호출했다고 해서, 수행 중인 모든 동작이 바로 멈추는 것은 아닙니다. 다만 cancel_futures 매개변수 값에 따라서, 아직 시작되지 않고 대기중인 요청을 바로 취소할 건지, 아니면 대기중인 요청까지 모두 끝나고 나서 리소스를 정리할지가 결정됩니다. |
python (버전 3.12.4 기준)에서 기본으로 제공되는 executor에는 ThreadPoolExecutor와 ProcessPoolExecutor 가 있습니다. 이름에서 유추할 수 있듯이, ThreadPoolExecutor는 요청을 수행할 때, ThreadPool을 사용하고, ProcessPoolExecutor는 요청을 수행할 때, ProcessPool을 사용합니다.
Process 와 Thread의 차이를 간략하게 정리하면, Process는 독립적으로 수행되며, 메모리 등의 리소스를 따로 할당해서 수행하게 됩니다. Thread는 Process 내에서 수행이 되며, 하나의 Process 안에서 수행되는 Thread들 간에는 리소스를 공유하게 됩니다.
따라서 ProcessPoolExecutor 의 경우, 요청들이 Process 단위로 수행되기 때문에, 수행시에 Process에게 리소스를 따로 할당되어야 하기 때문에, overhead가 발생됩니다. 반면 ThreadPoolExecutor의 경우, 리소스가 공유되기 때문에 리소스 할당에 따른 overhead는 없지만, 잘못 사용하게 되면 deadlock이 발생할 수 있습니다.
결론을 이야기하면, 요청 하나 하나의 수행 작업이 오래걸리는 경우 (CPU-bound tasks)에는 리소스 할당에 시간이 소요되더라도 ProcessPoolExecutor가 적합하고, 처리할 데이터가 많지만, 하나 하나의 수행 작업은 빠르게 처리되는 경우 (I/O bound tasks)에는 ThreadPoolExecutor를 사용하는 것이 좋습니다.
Future
executor에게 요청을 submit하면, Future 인스턴스를 반환해 줍니다. 이 Future 인스턴스를 통해서, executor에게 전달한 요청의 상태를 확인할 수 있습니다. 다음은 Future 객체에서 사용할 수 있는 method 들 입니다. (일부만 발췌했으며 전체 리스트를 확인하고 싶으시면 다음의 링크를 참하세요. https://docs.python.org/3/library/concurrent.futures.html)
canceled | 수행될 요청이 취소되었는지에 대한 결과를 돌려줍니다. |
running | 요청이 지금 현재 수행되고 있는지 상태를 돌려줍니다. |
done | 요청이 취소되었거나 완료되었으면 True를 돌려줍니다. |
cancel | 아직 대기 중인 요청에 한해서 요청을 취소합니다. |
result | 요청 수행에 대한 결과를 돌려줍니다. 만약 요청이 아직 끝나지 않았다면, 결과가 나올때까지 기다립니다. (block) |
Module functions
concurrent.futures에 정의되어 있는 futures 에서 사용하는 funciton들 입니다. future 요청은 한 가지 요청만 비동기로 처리하는 경우도 있지만, 보통의 경우 여러 요청을 동시에 처리하기 위해서 많이 사용되기 때문에, 전체 요청들에 대한 결과를 받아서 처리해야 하는 경우, 아래의 function들을 사용하게 됩니다.
wait | Future 인스턴스들의 요청이 끝날 때까지 기다립니다. timeout을 지정할 수도 있고, return_when argument를 이용해서 첫 번째 요청이 끝날때, 처음 exception 이 발생했을 때, 모든 요청이 다 처리되었을 때 등의 옵션을 지정할 수 있습니다. |
as_completed | Future 인스턴스들의 iterator를 반환해 줍니다. iterator는 generator처럼 동작합니다. 완료된 요청 순으로 결과가 나온 Futuer 인스턴스를 yield 해 줍니다. (비동기적으로 결과를 반환해 줍니다.) |
Futures 사용 예제
아래 코드는 간단히 작성한 사용 예제 입니다. (docs.python.org 에 있는 예시를 활용하였습니다.)
먼저 map을 이용해서 요청을 처리하는 예시입니다.
import concurrent.futures
import urllib.request
TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']
def load_url(url):
try:
with urllib.request.urlopen(url) as conn:
return url, conn.read(), None
except Exception as exc:
return url, None, str(exc)
with concurrent.futures.ThreadPoolExecutor() as executor:
for url, data, err in executor.map(load_url, TEST_URLS, timeout=60):
if data:
print('%r page is %d bytes' % (url, len(data)))
else:
print('%r generated an exception: %s' % (url, err))
map은 as_completed 와 비슷한데, 결과 iterator에는 Future 인스턴스가 아닌, 인스턴스의 result 들을 반환해 줍니다. map을 억지로 풀어쓴다면 다음과 같습니다.
with concurrent.futures.ThreadPoolExecutor() as executor:
for url, data, err in [f_instance.result() for f_instance in
concurrent.futures.as_completed([executor.submit(load_url, url) for url in TEST_URLS])]:
if data:
print('%r page is %d bytes' % (url, len(data)))
else:
print('%r generated an exception: %s' % (url, err))
여기서 이야기하고자 하는 바는, map의 경우에는 Future 인스턴스에서 result를 읽는 과정이 포함되어 있기 때문에, 특정요청에서 exception 이 발생하는 경우, iterator를 수행하는 과정에서 exception이 발생하게 됩니다. 이 exception으로 문제가 생기는 것을 막기 위해서는, 요청 내에서 exception이 발생하더라도 같은 포맷으로 결과를 리턴해 줄 수 있게 디자인해야 합니다.
위의 예시를 as_completed 를 사용해서 변경하면 다음과 같습니다.
import concurrent.futures
import urllib.request
TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in TEST_URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
여기서는 as_completed가 future instance 에서 결과를 받아오는 시점에서 exception을 처리할 수 있기 때문에 요청에서 Exception 이 발생한다고 해도, iterator에서 처리가 가능합니다.
마지막으로 wait을 사용하면, list에 future instance를 호출한 순서대로 저장하여, list를 이용해서 호출한 순서대로 결과를 처리할 수 있습니다. 참고로 wait function은 반환값으로 as_completed와 같이 처리된 순서대로 결과를 처리할 수 있는 generator 형태의 iterator를 돌려줍니다.
import concurrent.futures
import urllib.request
TEST_URLS = ['https://www.google.com', 'https://www.naver.com', 'https://www.tistory.com']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor() as executor:
future_list = [executor.submit(load_url, url, 60) for url in TEST_URLS]
concurrent.futures.wait(future_list)
for idx, future in enumerate(future_list):
url = TEST_URLS[idx]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))