분할 정복

0. 들어가며

  • 분할 정복 유형의 알고리즘
    • 주어진 문제를 작은 부분으로 나누고
    • 나눠진 각 부분의 솔루션을 구하고
    • 각 부분 문제의 솔루션을 합쳐서 전체 문제에 대한 솔루션을 구하는 방식
  • 이진 검색, 퀵 정렬, 병합 정렬, 행렬 곱셈, 고속 푸리에 변환, 스카이라인 알고리즘

1. 이진 검색

1) 이진 검색

  • 주어진 시퀀스가 정렬되어 있을 경우 N값이 있는 지 확인
    • 전체 시퀀스 범위를 range로 설정
    • 현재 range의 가운데 원소를 M이라고 하고 M과 N 비교
    • M=N이면 시퀀스에서 N을 찾았음 → 검색 종료
    • 그렇지 않다면 2가지 규칙에 따라 range 수정
      • N<M이면 range에서 M 오른쪽에 있는 모든 원소 제거
      • N>M이면 range에서 M 왼쪽에 있는 모든 원소 제거
    • range에 한 개 보다 많은 원소가 남아있다면 2단계로 이동
    • 그렇지 않으면 주어진 시퀀스에 N이 존재하지 않음 → 검색 종료

2) 연습 문제 18: 이진 검색 구현 및 성능 평가

#include <iostream>
#include <vector>
#include <chrono>
#include <random>
#include <algorithm>

bool linear_search(int N, std::vector<int>& S)
{
	for (auto i : S)
	{
		if (i == N)
			return true; // 원소를 찾음!
	}

	return false;
}

bool binary_search(int N, std::vector<int>& S)
{
	auto first = S.begin();
	auto last = S.end();

	while (true)
	{
		// 현재 검색 범위의 중간 원소를 mid_element에 저장
		auto range_length = std::distance(first, last);
		auto mid_element_index = std::floor(range_length / 2);
		auto mid_element = *(first + mid_element_index);

		// mid_element와 N 값을 비교
		if (mid_element == N)
			return true;
		else if (mid_element > N)
			std::advance(last, -mid_element_index);
		else
			std::advance(first, mid_element_index);

		// 현재 검색 범위에 하나의 원소만 남아 있다면 false를 반환
		if (range_length == 1)
			return false;
	}
}

void run_small_search_test()
{
	auto N = 2;
	std::vector<int> S {1, 3, 2, 4, 5, 7, 9, 8, 6};

	std::sort(S.begin(), S.end());

	if (linear_search(N, S))
		std::cout << "선형 검색으로 원소를 찾았습니다!" << std::endl;
	else
		std::cout << "선형 검색으로 원소를 찾지 못했습니다." << std::endl;

	if (binary_search(N, S))
		std::cout << "이진 검색으로 원소를 찾았습니다!" << std::endl;
	else
		std::cout << "이진 검색으로 원소를 찾지 못했습니다." << std::endl;
}

void run_large_search_test(int size, int N)
{
	std::vector<int> S;
	std::random_device rd;
	std::mt19937 rand(rd());

	// [1, size] 범위에서 정수 난수 발생
	std::uniform_int_distribution<std::mt19937::result_type> uniform_dist(1, size);

	// S에 난수 추가
	for (auto i = 0; i < size; i++)
		S.push_back(uniform_dist(rand));

	std::sort(S.begin(), S.end());

	// 검색 시간 측정 시작
	std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();

	bool search_result = binary_search(N, S);

	// 검색 시간 측정 종료
	std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();

	auto diff = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);
	std::cout << "이진 검색 수행 시간: " << diff.count() << std::endl;

	if (search_result)
		std::cout << "원소를 찾았습니다." << std::endl;
	else
		std::cout << "원소를 찾지 못했습니다." << std::endl;
}

int main()
{
	run_small_search_test();

	run_large_search_test(100000, 36543);
	run_large_search_test(1000000, 36543);
	run_large_search_test(10000000, 36543);

	return 0;
}

3) 실습 문제 8: 예방 접종

#include <iostream>
#include <vector>
#include <chrono>
#include <random>
#include <algorithm>

class Student
{
private:
	std::pair<int, int> name;	// 학생 ID, <이름, 성>
	bool vaccinated;			// 예방 접종 여부

public:
	// 생성자
	Student(std::pair<int, int> n, bool v) :
		name(n), vaccinated(v) {}

	// 정보 검색
	auto get_name() { return name; }
	auto is_vaccinated() { return vaccinated; }

	// 이름이 같으면 같은 사람으로 취급	
	bool operator ==(const Student& p) const
	{
		return this->name == p.name;
	}

	// 이름을 이용하여 정렬하도록 설정
	bool operator< (const Student& p) const
	{
		return this->name < p.name;
	}

	bool operator> (const Student& p) const
	{
		return this->name > p.name;
	}
};

// 1부터 max 사이의 ID를 갖는 임의의 학생 정보를 생성합니다.
auto generate_random_Student(int max)
{
	std::random_device rd;
	std::mt19937 rand(rd());

	// 학생 ID 범위는 [1, max]로 지정
	std::uniform_int_distribution<std::mt19937::result_type> uniform_dist(1, max);

	// 임의의 학생 정보 생성
	auto random_name = std::make_pair(uniform_dist(rand), uniform_dist(rand));
	bool is_vaccinated = uniform_dist(rand) % 2 ? true : false;

	return Student(random_name, is_vaccinated);
}

bool needs_vaccination(Student P, std::vector<Student>& people)
{
	auto first = people.begin();
	auto last = people.end();

	while (true)
	{
		auto range_length = std::distance(first, last);
		auto mid_element_index = std::floor(range_length / 2);
		auto mid_element = *(first + mid_element_index);

		// 목록에 해당 학생이 있고, 예방 접종을 받지 않았다면 true를 반환
		if (mid_element == P && mid_element.is_vaccinated() == false)
			return true;
		// 목록에 해당 학생이 있는데 이미 예방 접종을 받았다면 false를 반환
		else if (mid_element == P && mid_element.is_vaccinated() == true)
			return false;
		else if (mid_element > P)
			std::advance(last, -mid_element_index);
		if (mid_element < P)
			std::advance(first, mid_element_index);

		// 목록에 해당 학생이 없다면 true를 반환
		if (range_length == 1)
			return true;
	}
}

void search_test(int size, Student p)
{
	std::vector<Student> people;

	// 임의의 학생 정보 목록 생성
	for (auto i = 0; i < size; i++)
		people.push_back(generate_random_Student(size));

	std::sort(people.begin(), people.end());

	// 이진 검색 실행 및 시간 측정
	auto begin = std::chrono::steady_clock::now();

	bool search_result = needs_vaccination(p, people);

	auto end = std::chrono::steady_clock::now();
	auto diff = std::chrono::duration_cast<std::chrono::microseconds>(end - begin);

	std::cout << "이진 검색 소요 시간: " << diff.count() << "us" << std::endl;

	if (search_result)
		std::cout << "(" << p.get_name().first << " " << p.get_name().second << ") "
		<< "학생은 예방 접종이 필요합니다." << std::endl;
	else
		std::cout << "(" << p.get_name().first << " " << p.get_name().second << ") "
		<< "학생은 예방 접종이 필요하지 않습니다." << std::endl;
}

int main()
{
	// 임의의 학생 정보 생성
	auto p = generate_random_Student(1000);

	search_test(1000, p);
	search_test(10000, p);
	search_test(100000, p);

	return 0;
}

2. 분할 정복 이해하기

  • 문제의 규모가 커서 한 번에 해결하기 어려운 경우
  • 작은 부분 문제로 나누어 해결하는 방식
  1. 분할 : 주어진 문제를 동일한 방식으로 해결할 수 있는 여러 부분 문제로 나눔
  2. 정복 : 각 부분 문제에 대한 해답을 구함
  3. 결합 : 각 부분 문제에 대한 해답을 결합하여 전체 문제에 대한 해답을 구함

3. 분할 정복을 이용한 정렬 알고리즘

  • 정렬 알고리즘의 필요 요구사항
    • 모든 데이터 타입에 대해 동작
    • 많은 양의 데이터를 처리
    • 점근적 시간 복잡도 측면이나 실제 동작 시에 빠르게 동작

1) 병합 정렬

  • 많은 원소로 구성된 전체 집합을 작은 크기의 부분 집합으로 나누어 각각 정렬하고 정렬된 부분 집합을 오름차순 또는 내림차순 순서를 유지하며 합치는 방식

2) 연습 문제 19: 병합 정렬

#include<iostream>
#include<vector>

template<typename T>
std::vector<T> merge(std::vector<T>& arr1, std::vector<T>& arr2)
{
	std::vector<T> merged;
	auto iter1 = arr1.begin();
	auto iter2 = arr2.begin();
	
	while(iter1!=arr1.end() && iter2!=arr2.end())
	{
		if(*iter1<*iter2)
		{
			merged.emplace_back(*iter1);
			iter1++;
		}
		else
		{
			merged.emplace_back(*iter2);
			iter2++;
		}
	}
	if(iter1!=arr1.end())
	{
		for(;iter1!=arr1.end();iter1++)
			merged.emplace_back(*iter1);
	}
	else
	{
		for(;iter2!=arr2.end();iter2++)
			merged.emplace_back(*iter2);
	}
	return merged;
}

template<typename T>
std::vector<T> merge_sort(std::vector<T> arr)
{
	if(arr.size() >1)
	{
		auto mid = size_t(arr.size() / 2);
		auto left_half = merge_sort<T>(std::vector<T>(arr.begin(),arr.begin()+mid));
		auto right_half = merge_sort<T>(std::vector<T>(arr.begin()+mid, arr.end()));
		return merge<T>(left_half, right_half);
	}
	return arr;
}

template<typename T>
void print_vector(std::vector<T> arr)
{
	for(auto i : arr)
		std::cout<<i;
	std::cout<<std::endl;
}

void run_merge_sort_test()
{
	std::vector<int>    S1 {45, 1, 3, 1, 2, 3, 45, 5, 1, 2, 44, 5, 7};
	std::vector<float>  S2 {45.6f, 1.0f, 3.8f, 1.01f, 2.2f, 3.9f, 45.3f, 5.5f, 
		                    1.0f, 2.0f, 44.0f, 5.0f, 7.0f};
	std::vector<double> S3 {45.6, 1.0, 3.8 , 1.01, 2.2, 3.9, 45.3, 5.5, 
		                    1.0, 2.0, 44.0, 5.0, 7.0};
	std::vector<char>   C {'b', 'z', 'a', 'e', 'f', 't', 'q', 'u', 'y'};

	std::cout << "정렬되지 않은 입력 벡터:" << std::endl;
	print_vector<int>(S1);
	print_vector<float>(S2);
	print_vector<double>(S3);
	print_vector<char>(C);
	std::cout << std::endl;

	auto sorted_S1 = merge_sort<int>(S1);
	auto sorted_S2 = merge_sort<float>(S2);
	auto sorted_S3 = merge_sort<double>(S3);
	auto sorted_C = merge_sort<char>(C);

	std::cout << "병합 정렬에 의해 정렬된 벡터:" << std::endl;
	print_vector<int>(sorted_S1);
	print_vector<float>(sorted_S2);
	print_vector<double>(sorted_S3);
	print_vector<char>(sorted_C);
	std::cout << std::endl;
}

int main()
{
	run_merge_sort_test();
	return 0;
}

3) 퀵 정렬

  • 평균 실행 시간을 줄이는 목적
  • 핵심 연산은 병합이 아닌 분할

(1) 퀵 정렬의 분할 연산 방법

  1. 입력 배열을 두 개의 부분 배열 L과 R로 나눔, L은 입력 배열에서 P(피벗)보다 작거나 같은 원소를 포함하는 부분 배열 / R은 입력 배열에서 P(피벗)보다 큰 원소를 포함하는 부분 배열
  2. 입력 배열을 L,P,R 순서로 재구성

4) 연습 문제 20: 퀵 정렬

#include<iostream>
#include<vector>

template <typename T>
auto partition(typename std::vector<T>::iterator begin, typename std::vector<T>::iterator end)
{
	auto pivot_val = *begin;
	auto left_iter = begin + 1;
	auto right_iter = end;

	while(true)
	{
		while(*left_iter <= pivot_val && std::distance(left_iter, right_iter) > 0)
			left_iter++;
		while(*right_iter > pivot_val && std::distance(left_iter, right_iter) > 0)
			right_iter--;
		if(left_iter==right_iter)
			break;
		else
			std::iter_swap(left_iter, right_iter);
	}
	if(pivot_val > *right_iter)
		std::iter_swap(begin, right_iter);
	return right_iter;
}

template<typename T>
void quick_sort(typename std::vector<T>::iterator begin, typename std::vector<T>::iterator last)
{
	if(std::distance(begin, last)>=1)
	{
		auto partition_iter = partition<T>(begin,last);
		quick_sort<T>(begin, partition_iter-1);
		quick_sort<T>(partition_iter, last);
	}
}

template<typename T>
void print_vector(std::vector<T> arr)
{
	for(auto i : arr)
		std::cout<<i<<" ";
	std::cout<<std::endl;
}

void run_quick_sort_test()
{
	std::vector<int>    S1 {45, 1, 3, 1, 2, 3, 45, 5, 1, 2, 44, 5, 7};
	std::vector<float>  S2 {45.6f, 1.0f, 3.8f, 1.01f, 2.2f, 3.9f, 45.3f, 5.5f, 1.0f, 2.0f, 44.0f, 5.0f, 7.0f};
	std::vector<double> S3 {45.6, 1.0, 3.8 , 1.01, 2.2, 3.9, 45.3, 5.5, 1.0, 2.0, 44.0, 5.0, 7.0};
	std::vector<char>   C {'b', 'z', 'a', 'e', 'f', 't', 'q', 'u', 'y'};

	std::cout << "정렬되지 않은 입력 벡터:" << std::endl;
	print_vector<int>(S1);
	print_vector<float>(S2);
	print_vector<double>(S3);
	print_vector<char>(C);
	std::cout << std::endl;

	// arr.end()는 맨 마지막 원소 다음을 가리키므로 arr.end() - 1 형태로 호출합니다.
	quick_sort<int>(S1.begin(), S1.end() - 1);
	quick_sort<float>(S2.begin(), S2.end() - 1);
	quick_sort<double>(S3.begin(), S3.end() - 1);
	quick_sort<char>(C.begin(), C.end() - 1);

	std::cout << "퀵 정렬 수행 후의 벡터:" << std::endl;
	print_vector<int>(S1);
	print_vector<float>(S2);
	print_vector<double>(S3);
	print_vector<char>(C);
	std::cout << std::endl;
}

int main()
{
	run_quick_sort_test();
	return 0;
}

5) 실습 문제 9: 부분 정렬

#include <iostream>
#include <vector>
#include <chrono>
#include <random>
#include <algorithm>

template <typename T>
auto partition(typename std::vector<T>::iterator begin,
	typename std::vector<T>::iterator end)
{
	auto pivot_val = *begin;
	auto left_iter = begin + 1;
	auto right_iter = end;

	while (true)
	{
		// 벡터의 첫 번째 원소부터 시작하여 피벗보다 큰 원소를 찾습니다.
		while (*left_iter <= pivot_val && std::distance(left_iter, right_iter) > 0)
			left_iter++;

		// 벡터의 마지막 원소부터 시작하여 역순으로 피벗보다 작은 원소를 찾습니다.
		while (*right_iter > pivot_val && std::distance(left_iter, right_iter) > 0)
			right_iter--;

		// 만약 left_iter와 right_iter가 같다면 교환할 원소가 없음을 의미합니다.
		// 그렇지 않으면 left_iter와 right_iter가 가리키는 원소를 서로 교환합니다.
		if (left_iter == right_iter)
			break;
		else
			std::iter_swap(left_iter, right_iter);
	}

	if (pivot_val > *right_iter)
		std::iter_swap(begin, right_iter);

	return right_iter;
}

template <typename T>
void partial_quick_sort(typename std::vector<T>::iterator begin,
	typename std::vector<T>::iterator last,
	size_t k)
{
	// 벡터에 하나 이상의 원소가 남아 있는 경우
	if (std::distance(begin, last) >= 1)
	{
		// 분할 연산을 수행
		auto partition_iter = partition<T>(begin, last);

		// 분할 연산에 의해 생성된 두 벡터를 재귀적으로 정렬
		partial_quick_sort<T>(begin, partition_iter - 1, k);
		
		// 만약 마지막 피벗 위치가 k보다 작으면 오른쪽 부분 벡터도 정렬
		if(std::distance(begin, partition_iter) < k)
			partial_quick_sort<T>(partition_iter, last, k);
	}
}

template <typename T>
void quick_sort(typename std::vector<T>::iterator begin,
	typename std::vector<T>::iterator last)
{
	// 만약 벡터에 하나 이상의 원소가 있다면
	if (std::distance(begin, last) >= 1)
	{
		// 분할 작업을 수행
		auto partition_iter = partition<T>(begin, last);

		// 분할 작업에 의해 생성된 벡터를 재귀적으로 정렬
		quick_sort<T>(begin, partition_iter - 1);
		quick_sort<T>(partition_iter, last);
	}
}

template <typename T>
void print_vector(std::vector<T> arr)
{
	for (auto i : arr)
		std::cout << i << " ";

	std::cout << std::endl;
}

// [1, size] 범위에 속하는 임의의 정수를 생성합니다. 
template <typename T>
auto generate_random_vector(T size)
{
	std::vector<T> V;
	V.reserve(size);

	std::random_device rd;
	std::mt19937 rand(rd());

	std::uniform_int_distribution<std::mt19937::result_type> uniform_dist(1, size);

	for (T i = 0; i < size; i++)
		V.push_back(uniform_dist(rand));

	return std::move(V);
}

template <typename T>
void test_partial_quicksort(size_t size, size_t k)
{
		// 임의의 벡터를 생성하고, 복사본을 만들어 두 알고리즘에 각각 테스트 진행.
		auto random_vec = generate_random_vector<T>(size);
		auto random_vec_copy(random_vec);

		std::cout << "입력 벡터: "<<std::endl;
		print_vector<T>(random_vec); 
			
		// 부분 퀵 정렬 알고리즘 수행 시간 측정
		auto begin1 = std::chrono::steady_clock::now();
		partial_quick_sort<T>(random_vec.begin(), random_vec.end() - 1, k);
		auto end1 = std::chrono::steady_clock::now();
		auto diff1 = std::chrono::duration_cast<std::chrono::microseconds>(end1 - begin1);

		std::cout << std::endl;
		std::cout << "부분 퀵 정렬 수행 시간: " << diff1.count() << "us" << std::endl;
	
		std::cout << "(처음 " << k << "개 원소만) 부분 정렬된 벡터: ";
		print_vector<T>(random_vec);

		// 전체 퀵 정렬 알고리즘 수행 시간 측정
		auto begin2 = std::chrono::steady_clock::now();
		quick_sort<T>(random_vec_copy.begin(), random_vec_copy.end() - 1);
		auto end2 = std::chrono::steady_clock::now();
		auto diff2 = std::chrono::duration_cast<std::chrono::microseconds>(end2 - begin2);

		std::cout << std::endl;
		std::cout << "전체 퀵 정렬 수행 시간: " << diff2.count() << "us" << std::endl;
	
		std::cout << "전체 정렬된 벡터: ";
		print_vector<T>(random_vec_copy);
}

int main()
{
	test_partial_quicksort<unsigned>(100, 10);
	return 0;
}

6) 선형 시간 선택

  • 각 단계에서 두 개 이상으로 분할하는 것이 유리한 경우
  1. 입력 벡터 v 가 주어지면 여기서 i번째로 작은 원소를 찾으려함
  2. 입력 벡터를 V1,V2,V3~Vn/5으로 분할
  3. 각각의 Vi를 정렬
  4. 각각의 Vi에서 중앙값 m을 구하고 이를 모아 집합 M을 생성
  5. 집합 M에서 중앙값 q를 찾음
  6. q를 피벗 삼아 전체 벡터 V를 L과 R로 분할
  7. 분할된 벡터로 i 원소 찾기
    1. 만약 i == k 이면, V의 i번째 원소
    2. i < k이면, V=L로 설정하고 1단계로 이동
    3. i > k이면, V=R로 설정하고 1단계로 이동

7) 연습 문제 21: 선형 시간 선택

#include<iostream>
#include<vector>

template<typename T>
auto find_median(typename std::vector<T>::iterator begin,typename std::vector<T>::iterator last)
{
	quick_sort<T>(begin,last);
	return begin+(std::distance(begin, last)/2);
}

template<typename T>
auto partition_using_given_pivot(typename std::vector<T>::iterator begin, typename std::vector<T>::iterator end, typename std::vector<T>::iterator pivot)
{
	auto left_iter = begin;
	auto right_iter = end;

	while(true)
	{
		while(*left_iter < *pivot && left_iter != right_iter)
			left_iter++;
		while(*right_iter >= *pivot && left_iter != right_iter)
			right_iter--;
		if(left_iter==right_iter)
			break;
		else
			std::iter_swap(left_iter, right_iter);
	}
	if(*pivot > *right_iter)
		std::iter_swap(begin, right_iter);
	return right_iter;
}

template<typename T>
typename std::vector<T>::iterator linear_time_select(typename std::vector<T>::iterator begin, typename std::vector<T>::iterator last, size_t i)
{
	auto size = std::distance(begin,last);
	if(size  > 0 && i < size)
	{
		auto num_Vi=(size+4)/5;
		size_t j=0;
	
		std::vector<T> M;
		for(;j<size/5;j++)
		{
			auto b = begin + (j*5);
			auto l = begin + (j*5) + 5;
			M.push_back(*find_median<T>(b,l));
		}
		auto median_of_medians = (M.size()==1) ? M.begin() : linear_time_select<T>(M.begin(), M.end() - 1, M.size() / 2);
		auto partition_iter = partition_using_given_pivot<T>(begin, last, median_of_medians);
		auto k = std::distance(begin, partition_iter) + 1;
		if(i==k)
			return partition_iter;
		else if(i<k)
			return linear_time_select<T>(begin, partition_iter-1, i);
		else if(i>k)
			return linear_time_select<T>(partition_iter+1, last, i-k);
	}
	else
	{
		return begin;
	}
}

template<typename T>
std::vector<T> merge(std::vector<T>& arr1, std::vector<T>& arr2)
{
	std::vector<T> merged;
	auto iter1 = arr1.begin();
	auto iter2 = arr2.begin();
	
	while(iter1!=arr1.end() && iter2!=arr2.end())
	{
		if(*iter1<*iter2)
		{
			merged.emplace_back(*iter1);
			iter1++;
		}
		else
		{
			merged.emplace_back(*iter2);
			iter2++;
		}
	}
	if(iter1!=arr1.end())
	{
		for(;iter1!=arr1.end();iter1++)
			merged.emplace_back(*iter1);
	}
	else
	{
		for(;iter2!=arr2.end();iter2++)
			merged.emplace_back(*iter2);
	}
	return merged;
}

template<typename T>
std::vector<T> merge_sort(std::vector<T> arr)
{
	if(arr.size() >1)
	{
		auto mid = size_t(arr.size() / 2);
		auto left_half = merge_sort<T>(std::vector<T>(arr.begin(),arr.begin()+mid));
		auto right_half = merge_sort<T>(std::vector<T>(arr.begin()+mid, arr.end()));
		return merge<T>(left_half, right_half);
	}
	return arr;
}
template <typename T>
void print_vector(std::vector<T> arr)
{
	for (auto i : arr)
		std::cout << i << " ";

	std::cout << std::endl;
}

void run_linear_select_test()
{
	std::vector<int> S1 {45, 1, 3, 1, 2, 3, 45, 5, 1, 2, 44, 5, 7};

	std::cout << "입력 벡터:" << std::endl;
	print_vector<int>(S1);

	std::cout << "정렬된 벡터:" << std::endl;
	print_vector<int>(merge_sort<int>(S1));

	std::cout << "3번째 원소: " << *linear_time_select<int>(S1.begin(), S1.end() - 1, 3) << std::endl;
	std::cout << "5번재 원소: " << *linear_time_select<int>(S1.begin(), S1.end() - 1, 5) << std::endl;
	std::cout << "11번째 원소: " << *linear_time_select<int>(S1.begin(), S1.end() - 1, 11) << std::endl;
}

int main()
{
	run_linear_select_test();
	return 0;
}

4. 분할 정복 기법과 C++ 표준 라이브러리 함수

binary_search() 이진 검색을 이옹하여 컨테이너에서 원소 하나를 찾음
search() 컨테이너에서 일련의 원소를 찾음
upper_bound() / lower_bound() 컨테이너에서 주어진 값 보다 큰/작은 원소가 나타나기 시작하는 위치의 반복자를 반환
partition() 분할 연산 수행 (피벗을 중심으로 작은 원소는 왼쪽 / 큰 원소는 오른쪽)
sort() 컨테이너 원소를 정렬
merge() 두 개의 입력 컨테이너를 합침 / 원소 순서는 변경되지않음
nth_element() n번째로 작은 원소를 반환
reduce() 지정한 범위의 원소에 대해 reduce 연산을 수행하고 결과 반환

5. 맵리듀스 : 더 높은 추상화 레벨의 분할 정복 기법

1) 맵과 리듀스 추상화

  • 맵 : 컨테이너 C를 입력으로 받아 컨테이너의 모든 원소에 함수 $f(x)$를 적용하는 연산
  • 리듀스 : 컨테이너 C의 모든 원소 $x$에 함수 $f(acc,x)$ 를 적용하여 하나의 값으로 축약하는 연산

2) 연습 문제 22: C++ 표준 라이브러리를 이용하여 맵과 리듀스 구현하기

#include<iostream>
#include<vector>
#include<algorithm>
#include<numeric>
#include<cmath>

void transform_test(std::vector<int> S)
{
	std::vector<int> Tr;
	std::cout<<"[맵 테스트]"<<std::endl;
	std::cout<<"입력배열, S:";
	for(auto i : S)
		std::cout<<i<<" ";
	std::cout<<std::endl;

	std::transform(S.begin(),S.end(),std::back_inserter(Tr),[](int x) {return std::pow(x,2.0);});
	std::cout<<"std::transform(), Tr: ";
	for(auto i : Tr)
		std::cout<<i<<" ";
	std::cout<<std::endl;

	std::for_each(S.begin(),S.end(),[](int& x) {x = std::pow(x,2.0);});
	std::cout<<"std::for_each(), S: ";
	for(auto i : S)
		std::cout<<i<<" ";
	std::cout<<std::endl;
}

void reduce_test(std::vector<int> S)
{
	std::cout<<"[리듀스 테스트]"<<std::endl;
	std::cout<<"입력배열, S:";
	for(auto i : S)
		std::cout<<i<<" ";
	std::cout<<std::endl;

	auto ans = std::accumulate(S.begin(),S.end(),0,[](int acc, int x) {return acc+x;});
	std::cout<<"std::accumulate(), ans: "<<ans<<std::endl;
}

int main()
{
	std::vector<int> S = {1,10,4,7,3,5,6,9,8,2};
	transform_test(S);
	reduce_test(S);
}

3) 맵리듀스 프레임워크를 이용한 부분 통합

  • 맵과 리듀스 연산을 확장성 있고 분산 처리가 가능하게 만듦 → 컴퓨터 클러스터에서 동작 가능, 연산 시간을 크게 단축

4) 연습 문제 23: 맵리듀스를 사용하여 소수 확인하기

// Copyright (c) 2009-2016 Craig Henderson
// <https://github.com/cdmh/mapreduce>
// The prime number code is based on work from Christian Henning [chhenning@gmail.com]

// Adapted by Payas Rajan <payasrajan@live.com>

#include 
#include "mapreduce.hpp"

namespace prime_calculator {

// 주어진 number가 소수인지를 확인
bool const is_prime(long const number)
{
	if (number > 2)
	{
		if (number % 2 == 0)
			return false;

		long const n = std::abs(number);
		long const sqrt_number = static_cast(std::sqrt(static_cast(n)));

		for (long i = 3; i <= sqrt_number; i += 2)
		{
			if (n % i == 0)
				return false;
		}
	}
	else if (number == 0 || number == 1)
		return false;

	return true;
}

// 정수 생성 클래스. first부터 last까지 step 간격의 정수를 생성.
template
class number_source : mapreduce::detail::noncopyable
{
public:
	number_source(long first, long last, long step)
		: sequence_(0), first_(first), last_(last), step_(step) {}

	bool const setup_key(typename MapTask::key_type& key)
	{
		key = sequence_++;
		return (key * step_ <= last_);
	}

	bool const get_data(typename MapTask::key_type const& key,
		typename MapTask::value_type& value)
	{
		typename MapTask::value_type val;

		val.first = first_ + (key * step_);
		val.second = std::min(val.first + step_ - 1, last_);

		std::swap(val, value);
		return true;
	}

private:
	long       sequence_;
	long const step_;
	long const last_;
	long const first_;
};

// 맵 연산
struct map_task : public mapreduce::map_task<long, std::pair<long,="" long=""> >
{
	template
	void operator()(Runtime& runtime, key_type const& /*key*/, value_type const& value) const
	{
		for (key_type loop = value.first; loop <= value.second; ++loop)
			runtime.emit_intermediate(is_prime(loop), loop);
	}
};

// 리듀스 연산
struct reduce_task : public mapreduce::reduce_task<bool, long="">
{
	template
	void operator()(Runtime& runtime, key_type const& key, It it, It ite) const
	{
		if (key)
			std::for_each(it, ite, std::bind(&Runtime::emit, &runtime, true, std::placeholders::_1));
	}
};

typedef mapreduce::job<prime_calculator::map_task, prime_calculator::reduce_task,="" mapreduce::null_combiner,="" prime_calculator::number_source<prime_calculator::map_task="">> job;

} // namespace prime_calculator

int main(int argc, char* argv[])
{
	mapreduce::specification spec;

	int prime_limit = 1000;

	// 스레드 개수 설정
	int cpu_cores = std::max(1U, std::thread::hardware_concurrency());
	spec.map_tasks = cpu_cores;
	spec.reduce_tasks = cpu_cores;

	// 지정한 범위의 정수 생성을 위한 객체
	prime_calculator::job::datasource_type datasource(0, prime_limit, prime_limit / spec.reduce_tasks);

	std::cout << "0부터 " << prime_limit << " 사이의 정수에서 소수 판별:" << std::endl;
	std::cout << "CPU 코어 개수: " << cpu_cores << std::endl;

	// 맵리듀스 실행
	prime_calculator::job job(datasource, spec);
	mapreduce::results result;

	job.run<mapreduce::schedule_policy::cpu_parallel >(result);

	std::cout << "맵리듀스 실행 시간: " << result.job_runtime.count() << " sec." << std::endl;
	std::cout << "검출된 소수 개수: " << std::distance(job.begin_results(), job.end_results()) << std::endl;

	// 결과 출력
	for (auto it = job.begin_results(); it != job.end_results(); ++it)
		std::cout << it->second << " ";
	std::cout << std::endl;

	return 0;
}

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// 
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// 
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
</mapreduce::schedule_policy::cpu_parallel</prime_calculator::map_task,></bool,></long,></payasrajan@live.com>

5) 실습 문제 10: 맵리듀스를 이용하여 워드카운트 구현하기

// Copyright (c) 2009-2016 Craig Henderson
// <https://github.com/cdmh/mapreduce>

#include "mapreduce.hpp"
#include 
#include  

namespace wordcount {

struct map_task : public mapreduce::map_task<
	std::string,                             // MapKey (filename)
	std::pair>  // MapValue (memory mapped file contents)
{
	template
	void operator()(Runtime& runtime, key_type const& key, value_type& value) const
	{
		bool in_word = false;
		char const* ptr = value.first;
		char const* end = ptr + value.second;
		char const* word = ptr;

		for (; ptr != end; ++ptr)
		{
			char const ch = std::toupper(*ptr, std::locale::classic());

			if (in_word)
			{
				if ((ch < 'A' || ch > 'Z') && ch != '\\'')
				{
					runtime.emit_intermediate(std::pair(word, ptr - word), 1);
					in_word = false;
				}
			}
			else if (ch >= 'A' && ch <= 'Z')
			{
				word = ptr;
				in_word = true;
			}
		}

		if (in_word)
		{
			assert(ptr > word);
			runtime.emit_intermediate(std::pair(word, ptr - word), 1);
		}
	}
};

template
struct reduce_task : public mapreduce::reduce_task<keytype, unsigned="">
{
	using typename mapreduce::reduce_task<keytype, unsigned="">::key_type;

	template
	void operator()(Runtime& runtime, key_type const& key, It it, It const ite) const
	{
		runtime.emit(key, std::accumulate(it, ite, 0));
	}
};

}   // namespace wordcount

template<>
inline uintmax_t const mapreduce::length(std::pair const& string)
{
	return string.second;
}

template<>
inline char const* const mapreduce::data(std::pair const& string)
{
	return string.first;
}

template<>
inline
size_t const
mapreduce::hash_partitioner::operator()(
	std::pair const& key,
	size_t partitions) const
{
	return boost::hash_range(key.first, key.first + key.second) % partitions;
}

namespace {

template
double const sum(T const& durations)
{
	double sum = 0.0;
	for (auto& chrono : durations)
		sum += chrono.count();
	return sum;
}

void write_stats(mapreduce::results const& result)
{
	if (result.map_times.size() == 0 || result.reduce_times.size() == 0)
		return;

	std::cout << std::endl << "\\nMapReduce statistics:";
	std::cout << "\\n  MapReduce job runtime                     : " << result.job_runtime.count() << "s of which...";
	std::cout << "\\n    Map phase runtime                       : " << result.map_runtime.count() << "s";
	std::cout << "\\n    Reduce phase runtime                    : " << result.reduce_runtime.count() << "s";
	std::cout << "\\n\\n  Map:";
	std::cout << "\\n    Total Map keys                          : " << result.counters.map_keys_executed;
	std::cout << "\\n    Map keys processed                      : " << result.counters.map_keys_completed;
	std::cout << "\\n    Map key processing errors               : " << result.counters.map_key_errors;
	std::cout << "\\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
	std::cout << "\\n    Fastest Map key processed in            : " << std::min_element(result.map_times.cbegin(), result.map_times.cend())->count() << "s";
	std::cout << "\\n    Slowest Map key processed in            : " << std::max_element(result.map_times.cbegin(), result.map_times.cend())->count() << "s";
	std::cout << "\\n    Average time to process Map keys        : " << sum(result.map_times) / double(result.map_times.size()) << "s";

	std::cout << "\\n\\n  Reduce:";
	std::cout << "\\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
	std::cout << "\\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
	std::cout << "\\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
	std::cout << "\\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
	std::cout << "\\n    Number of Result Files                  : " << result.counters.num_result_files;
	if (result.reduce_times.size() > 0)
	{
		std::cout << "\\n    Fastest Reduce key processed in         : " << std::min_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s";
		std::cout << "\\n    Slowest Reduce key processed in         : " << std::max_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s";
		std::cout << "\\n    Average time to process Reduce keys     : " << sum(result.reduce_times) / double(result.map_times.size()) << "s";
	}
	std::cout << std::endl;
}

std::ostream& operator<<(std::ostream& o, std::pair const& str)
{
	std::copy(str.first, str.first + str.second, std::ostream_iterator(o, ""));
	return o;
}

template
void write_frequency_table(Job const& job)
{
	flush(std::cout);

	auto it = job.begin_results();
	auto ite = job.end_results();
	if (it != ite)
	{
		std::cout << std::endl << "맵리듀스 결과:" << std::endl;

		using frequencies_t = std::list;
		frequencies_t frequencies;
		frequencies.push_back(*it);
		auto it_smallest = frequencies.rbegin();
		for (++it; it != ite; ++it)
		{
			if (frequencies.size() < 1000)    // show top 10
			{
				frequencies.push_back(*it);
				if (it->second < it_smallest->second)
					it_smallest = frequencies.rbegin();
			}
			else if (it->second > it_smallest->second)
			{
				*it_smallest = *it;

				it_smallest = std::min_element(
					frequencies.rbegin(),
					frequencies.rend(),
					mapreduce::detail::less_2nd);
			}
		}

		frequencies.sort(mapreduce::detail::greater_2nd);
		for (auto& freq : frequencies)
			std::cout << freq.first << "\\t" << freq.second << std::endl;
	}
}

}   // anonymous namespace

int main(int argc, char* argv[])
{
	std::cout << "맵리듀스를 이용한 워드카운트 프로그램";
	if (argc < 2)
	{
		std::cerr << "사용법: wordcount <디렉토리> [num_map_tasks]\\n";
		return 1;
	}

	// 입력 디렉토리 이름으로부터 맵리듀스 specification을 준비
	mapreduce::specification spec;
	spec.input_directory = argv[1];

	spec.map_tasks = std::max(1U, std::thread::hardware_concurrency());
	spec.reduce_tasks = std::max(1U, std::thread::hardware_concurrency());

	std::cout << std::endl << std::max(1U, std::thread::hardware_concurrency()) 
		<< "개의 CPU 코어 사용" << std::endl;

	// job을 생성하고 실행
	using Job= mapreduce::job<wordcount::map_task, wordcount::reduce_task<std::string="">>;
	try
	{
		mapreduce::results result;
		typename Job::datasource_type datasource(spec);

		Job job(datasource, spec);
		std::cout << "워드카운트를 위한 맵리듀스 병렬 실행..." << std::endl;
		job.run<mapreduce::schedule_policy::cpu_parallel >(result);

		std::cout << "맵리듀스 종료" << std::endl;

		write_stats(result);
		write_frequency_table(job);
	}
	catch (std::exception& e)
	{
		std::cout << "\\nError: " << e.what();
	}

	return 0;
}

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// 
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// 
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
</mapreduce::schedule_policy::cpu_parallel</wordcount::map_task,></keytype,></keytype,>

 

 

출처 : 코딩 테스트를 위한 자료 구조와 알고리즘 with C++ (길벗)