새로운 프로젝트를 진행하면서 부동산 지수를 선형 회귀 알고리즘을 통해 계산해야 했다.
머신러닝 알고리즘은 GPU 연산을 통해 matrix 연산 등을 빠르게 처리할 수 있다.
문제가 발생한 부분은 그 커다란 matrix 를 어떻게 넣어야 하는지 에서 발생했다.
실제 한국 부동산의 지수를 알아내기 위해 알고리즘에 필요한 matrix 의 크기는 [6800만 ~ 1억 5천만] by [123 (2014년 1월 부터 현재까지 개월 수)] 였다.
코드로 보면,
num_months = len(date_range)
num_pairs = len(df_pairs)
dummy_matrix = lil_matrix((num_pairs, num_months))
print('더미 행렬.shape = ', dummy_matrix.shape)
for i, group_id, PrevYearMonth, RecentYearMonth, LogPriceDiff in df_pairs.itertuples():
if i % 50000 == 0:
print(i)
initial_period = initial_year_month.to_period('M')
prev_period = PrevYearMonth.to_period('M')
recent_period = RecentYearMonth.to_period('M')
prev_month_idx = (prev_period.year - initial_period.year) * 12 + (prev_period.month - initial_period.month)
recent_month_idx = (recent_period.year - initial_period.year) * 12 + (recent_period.month - initial_period.month)
dummy_matrix[i, prev_month_idx] = -1
dummy_matrix[i, recent_month_idx] = 1
df_fairs 는 6800만 ~ 1억 5천만 의 데이터를 갖고 있는 Pandas Dataframe 이다. 이 데이터를 순회하며 matrix 를 초기화하고 있다.
이 부분은 6800만개의 데이터로 돌릴 경우에는 20분에 결쳐 연산되지만, 데이터가 1억개가 넘어가면 메모리 문제인지 급격하게 느려지다가 Process Kill 된다.
문제점
데이터가 커서 메모리를 많이 먹는다.
연산 자체가 느리다.
해결방안
병렬 처리?
먼저 여러 개의 프로세스로 병렬처리 하기 보다는 가벼운 쓰레드로 처리하는 것이 효율적일 것이라고 생각했다.
코드로 살펴보자.
num_months = len(date_range)
num_pairs = len(df_pairs)
dummy_matrix = lil_matrix((num_pairs, num_months))
print('더미 행렬.shape = ', dummy_matrix.shape)
def init(start_idx, end_idx):
global dummy_matrix
for i in range(start_idx, end_idx):
row = df_pairs.iloc[i]
PrevYearMonth = row['PrevYearMonth']
RecentYearMonth = row['RecentYearMonth']
prev_year, prev_month = PrevYearMonth.year, PrevYearMonth.month
recent_year, recent_month = RecentYearMonth.year, RecentYearMonth.month
prev_month_idx = (prev_year - initial_year) * 12 + (prev_month - initial_month)
recent_month_idx = (recent_year - initial_year) * 12 + (recent_month - initial_month)
dummy_matrix[i, prev_month_idx] = -1
dummy_matrix[i, recent_month_idx] = 1
num_threads = 10
step = num_pairs // num_threads
threads = []
for i in range(num_threads):
start_idx = i * step
if i == num_threads - 1:
end_idx = num_pairs # 마지막 스레드는 남은 모든 데이터를 처리
else:
end_idx = (i + 1) * step
thread = Thread(target=init, args=(start_idx, end_idx))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
쓰레드는 메모리 자원을 공유하므로 인덱스만 분리하여 같은 matrix 를 각각 초기화해줄 수 있도록 했다.
실행 결과 20분 걸리던 작업이 17분 걸리게 단축되긴 했으나, 의도한만큼의 성능이 나오지는 않는다.
데이터가 1억건이 넘어갈 경우 마찬가지로 수행이 Process Kill 이 나버린다.
실행 결과 20분 걸리던 작업이 17분 걸리게 단축되긴 했으나, 의도한만큼의 성능이 나오지는 않는다.
파이썬에는 GIL(Global Interpreter Lock) 이라는 것이 존재하고, CPyhton 으로 파이썬 코드를 실행시킬 경우 GIL이 동작한다.
GIL 이란 동시성에서 race condition 을 방지하기 위해 존재하는 것인데 단순하게 같은 라인의 바이트코드를 여러 쓰레드가 동시에 수행할 수 없도록 막는 방법으로 동시성 문제를 해결하고 있다.
따라서, 쓰레드를 분리하는 경우 어차피 바이트코드 자체를 동시에 수행할 수 없으므로 병렬 처리의 효율성이 떨어진다.
그러면 CPython 으로 안돌리면 되는거 아니냐?
-> 머신러닝 알고리즘을 돌리기 위한 다양한 라이브러리를 사용하고 있는데 대부분 CPython 기반 라이브러리이다. GIL 을 사용하지 않는 pypy 에서는 해당 라이브러리를 지원하지 않아 라이브러리 의존 코드를 없애거나 pypy 를 지원하는 라이브러리로 교체해야하는데, 이 작업도 쉽지 않다.
데이터가 1억건이 넘어갈 경우 마찬가지로 수행이 Process Kill 이 나버린다.
쓰레드는 메모리를 공유해서 사용하고, 메모리 관련 작업은 하지 않았으므로 당연함..
여러 개의 프로세스를 사용해 처리해보자.
num_months = len(date_range)
num_pairs = len(df_pairs)
dummy_matrix = lil_matrix((num_pairs, num_months))
print('더미 행렬.shape = ', dummy_matrix.shape)
def init(df_split, start_idx):
for i in range(len(df_split)):
row = df_split.iloc[i]
PrevYearMonth = row['PrevYearMonth']
RecentYearMonth = row['RecentYearMonth']
prev_year, prev_month = PrevYearMonth.year, PrevYearMonth.month
recent_year, recent_month = RecentYearMonth.year, RecentYearMonth.month
prev_month_idx = (prev_year - initial_year) * 12 + (prev_month - initial_month)
recent_month_idx = (recent_year - initial_year) * 12 + (recent_month - initial_month)
dummy_matrix[start_idx + i, prev_month_idx] = -1
dummy_matrix[start_idx + i, recent_month_idx] = 1
n_cores = 10
step = len(df_pairs) // n_cores
pool = Pool(processes=n_cores)
df_splits = [df_pairs.iloc[i*step:(i+1)*step] for i in range(n_cores)]
if len(df_pairs) % n_cores != 0:
df_splits[-1] = df_pairs.iloc[(n_cores-1)*step:] # 마지막 분할에 나머지 포함
start_indices = [i * step for i in range(n_cores)]
pool.starmap(init, zip(df_splits, start_indices))
pool.close()
pool.join()
연산이 매우 빨라졌다. 20분 -> 3분
그러나, matrix 수정이 발생하지 않았다.
그러나, matrix 수정이 발생하지 않았다.
프로세스는 별도의 메모리를 할당받아 연산을 수행한다.
dummy_matrix 는 프로세스끼리 공유하는 메모리가 아니라 각각 할당해 사용하는 변수이므로 각자 초기화해봤자 메인 프로세스에서 값을 사용할 수 없다.
그러면 프로세스끼리 메모리 공유하면 되는거 아니냐?
-> 사용 중인 lil_matrix 는 단순한 배열이 아니므로 쉽게 공유 메모리에 올리기 어렵다. + 안그래도 메모리가 터지는데 공유 메모리로 사용하면 마찬가지로 메모리 문제가 발생할 것 같다.
해결방안
각각 프로세스가 진행한 연산을 파일로 저장해두고 실제 알고리즘을 돌릴 때 사용하자.
num_months = len(date_range)
num_pairs = len(df_pairs)
dummy_matrix = lil_matrix((num_pairs, num_months))
print('더미 행렬.shape = ', dummy_matrix.shape)
def init(df_split, start_idx):
matrix = lil_matrix((len(df_split), num_months))
for i in range(len(df_split)):
row = df_split.iloc[i]
PrevYearMonth = row['PrevYearMonth']
RecentYearMonth = row['RecentYearMonth']
prev_year, prev_month = PrevYearMonth.year, PrevYearMonth.month
recent_year, recent_month = RecentYearMonth.year, RecentYearMonth.month
prev_month_idx = (prev_year - initial_year) * 12 + (prev_month - initial_month)
recent_month_idx = (recent_year - initial_year) * 12 + (recent_month - initial_month)
matrix[i, prev_month_idx] = -1
matrix[i, recent_month_idx] = 1
save_npz(f'matrix_{start_idx}.npz', matrix)
n_cores = 10
step = len(df_pairs) // n_cores
pool = Pool(processes=n_cores)
df_splits = [df_pairs.iloc[i*step:(i+1)*step] for i in range(n_cores)]
if len(df_pairs) % n_cores != 0:
df_splits[-1] = df_pairs.iloc[(n_cores-1)*step:] # 마지막 분할에 나머지 포함
start_indices = [i * step for i in range(n_cores)]
pool.starmap(init, zip(df_splits, start_indices))
pool.close()
pool.join()
위 코드는 20분 -> 4분 으로 시간이 많이 단축되었고, 1억 건의 데이터에서도 충분히 돌아갔다.
각 프로세스는 자신이 맡은 matrix 만 초기화하고 파일에 저장하면 되고, 멀티 프로세스를 사용하므로 CPU 를 각각 사용해서 빠르게 동작한다.
어차피 데이터는 1억건이라서 똑같이 dummy_matrix 를 사용하면 메모리 사용하는 건 똑같은거 아니냐?
-> 정확히는 측정을 해보지 않아 모르겠다. 뇌피셜로는 dummy_matrix 에 값이 비어있기 전에는 주소값만 가지면 되고 초기화 로직을 실행할때는 값을 들고 있어서 그렇지 않나 싶다. for 문 내에서 특정 count 연산을 수행하면 가비지 컬렉션을 강제로 동작시키거나 다른 방법으로 해결할 수도 있을 것 같다.