전문 개발자가 아닌 상황에서 데이타 가공, 분석 또는 자동화된 테스트를 목적으로 파이썬을 주로 사용해왔습니다. 자연히 다수의 멀티프로세스를 사용할 일이 없었습니다. 하지만 이번에 랜덤 상황을 시뮬레이션 해볼 문제가 있어서 멀티 프로세싱 모듈을 사용해 보았습니다. 프로세스 간의 공유할 자원이 없어서 Lock이나 Pipe 등은 사용하지 않고 기본 기능만 사용했습니다. 이번 포스트는 공부한 내용을 기록하는 차원에서 만들었습니다.
전역 인터프리터 락, 그리고 멀티 프로세싱
파이썬은 전역 인터프리터 락(Global Interpreter Lock, GIL) 특성 때문에, 쓰레드 단위에서는 진정한 병렬성을 구현하기 어렵습니다. GIL 환경에서는 하나의 쓰레드가 자원을 독점하고, 다수의 쓰레드는 순차적으로 처리됩니다. 파이썬 초기에 대다수의 CPU가 코어가 하나였기 때문에 진정한 의미의 동시성이 중요하지 않았고, 또 GIL로 인해서 파이썬의 설계와 관리의 단순성을 유지할 수 있었다고 합니다. 여튼 파이썬에서는 병렬로 처리하기 위해서 멀티 프로세스 모듈을 이용해야 합니다. 다행히 쓰레드를 이용하는 모듈과 유사한 multiprocessing 이 있어서 별 다른 어려움없이 사용할 수 있습니다.
from multiprocessing import Pool, Process
https://docs.python.org/ko/3.8/library/multiprocessing.html
파이썬에서 multiprocessing 라이브러리를 통해서 병렬 처리를 쉽게 할 수 있습니다.
Process
import os
from multiprocessing import Process
def Myproc(num):
p_id = os.getpid()
mysum = sum( [i for i in range(num)])
return p_id, mysum
if __name__ == '__main__':
proc = Process(target=Myproc, args=(10, ))
proc.start()
proc.join()
Process에서 target에 실행할 모듈을 지정하고, agrs에서 변수를 넣으면 새로운 프로세스가 생성이 됩니다. 이후에 해당 process 객체를 start하면 target에 등록된 Myproc가 돌아갑니다. 하지만 이 상태로는 출력 내용도 확인이 안되고, 결과값을 받을 수가 없습니다. 프로세스간 데이타 공유는 Pipe나 공유 메모리를 이용해야 하는데, 조금 귀찮은 부분입니다.
마지막에 join의 기본 인자는 timeout 시간입니다. 관례적으로 join은 timeout 또는 process 종료까지 기다린다라는 의미라고 합니다. 부모 프로세스에서 파생된 하위 프로세스가 업무(?) 마치면 부모 프로세스에 합쳐진다는 개념 같은데, 조금 어렵네요. ㅎㅎ
Queue
프로세스의 결과를 가져오기 위해서 간단히 큐를 이용할 수 있습니다. 또한 Pipe나 공유 메모리를 이용할 수도 있습니다. 아래는 Queue를 이용하여 Myproc에서 연산한 결과를 출력하는 코드입니다.
import os
from multiprocessing import Process, Queue
def Myproc(num, que):
p_id = os.getpid()
mysum = sum( [i for i in range(num)])
que.put([p_id, mysum]) # process 결과값을 queue 넣기
if __name__ == '__main__':
que = Queue() # queue 생성
proc = Process(target=Myproc, args=(5, que ))
proc.start() # process 시작
result = que.get()
print(result)
proc.join() # process 종료
Pool
mapreduce 작업을 위해서 좀 더 간단한 Pool을 이용할 수 있습니다.
import os
from multiprocessing import Pool
def Myproc(l):
return os.getpid(), sum(l)
if __name__ == '__main__':
pool = Pool(processes = 2)
data = [i for i in range(1000000)]
split_data = [ data[:500000], data[500000:] ]
ret = pool.starmap(Myproc, zip(split_data, ))
print('ret:', ret)
Pool에서 사용할 process의 숫자를 지정합니다. 보통 본인의 논리 코어수가 적당합니다. map 메소드를 이용해서 process를 실행하고 결과값을 받습니다. 결과는 process 숫자만큼의 길이를 가지는 리스트에 저장됩니다. 아래는 위의 코드를 실행한 결과 입니다. 다른 pid를 가지는 프로세스에서 실행이 된 결과를 확인할 수 있습니다.
ret: [(14116, 124999750000), (14512, 374999750000)]
※ 기존에 map 대신에 python 3.3부터는 지원이 되는 starmap을 사용했습니다. 복수의 인자를 사용할 수 있습니다.
starmap(func, iterable[, chunksize])
map()과 같지만, iterable 의 요소가 인자로 언팩 될 이터러블일 것으로 기대합니다.
따라서 iterable 이 [(1,2), (3, 4)] 미면 결과는 [func(1,2), func(3,4)] 가 됩니다
https://docs.python.org/ko/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap
윈도우와 Mac에서는 process 생성 기본 방법은 spawn입니다. process를 spawn 할 때, 새로운 인터프리터를 만들고 모듈을 새로 import 합니다. 변수와 객체도 복사되어서 부모 프로세스와 다른 ID를 가지게 됩니다. 다만, 실행 중인 쓰레드는 복사가 되지 않습니다. spawn과 fork의 자세한 차이는 아래 블로그에 잘 설명되어 있습니다.
https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/
'통계, 개발, 데이타 > Python' 카테고리의 다른 글
파이썬(python) 로깅하기 - import logging #1 (0) | 2021.07.10 |
---|---|
파이썬(python) cProfile 사용법 (0) | 2021.07.06 |
댓글