운영체제
동기화란?
여러 프로세스나 스레드가 공유 자원에 안전하게 접근하도록 순서를 조정하는 것
동기화가 필요한 이유
- 공유 자원 : 여러 프로세스가 동일한 자원 사용
- 경쟁 상태 : 동시 접근 시 예상치 못한 결과 발생
- 데이터 일관성 : 정확한 결과 보장 필요
임계 영역 (Critical Section)
공유 자원에 접근하는 코드 영역
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 임계 영역 문제 예시
balance = 1000
def withdraw(amount):
global balance
# 임계 영역 시작
temp = balance
temp = temp - amount
balance = temp
# 임계 영역 종료
# 두 스레드가 동시에 실행되면?
# Thread 1: withdraw(100)
# Thread 2: withdraw(200)
# 예상: balance = 700
# 실제: balance = 800 또는 900 (경쟁 상태 발생)
임계 영역 해결 조건
- 상호 배제 : 한 번에 하나의 프로세스만 임계 영역 진입
- 진행 : 임계 영역에 아무도 없다면 진입 허용
- 한정 대기 : 무한정 대기하지 않음
동기화 기법
뮤텍스 (Mutex)
상호 배제를 위한 가장 기본적인 동기화 기법
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
class BankAccount:
def __init__(self, initial_balance):
self.balance = initial_balance
self.lock = threading.Lock() # 뮤텍스
def withdraw(self, amount):
with self.lock: # 임계 영역 보호
if self.balance >= amount:
temp = self.balance
# 다른 작업 시뮬레이션
import time
time.sleep(0.1)
self.balance = temp - amount
return True
return False
def deposit(self, amount):
with self.lock:
temp = self.balance
time.sleep(0.1)
self.balance = temp + amount
# 사용 예시
account = BankAccount(1000)
def make_withdrawal():
result = account.withdraw(100)
print(f"출금 결과: {result}, 잔액: {account.balance}")
# 여러 스레드에서 동시 출금
threads = []
for i in range(5):
thread = threading.Thread(target=make_withdrawal)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
세마포어 (Semaphore)
정해진 개수만큼의 자원에 대한 접근 제어
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
import time
class DatabasePool:
def __init__(self, max_connections):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
def get_connection(self):
self.semaphore.acquire() # 자원 획득
print(f"연결 획득: {threading.current_thread().name}")
return "DB Connection"
def release_connection(self, connection):
print(f"연결 해제: {threading.current_thread().name}")
self.semaphore.release() # 자원 해제
def worker(db_pool, worker_id):
conn = db_pool.get_connection()
# 데이터베이스 작업 시뮬레이션
time.sleep(2)
db_pool.release_connection(conn)
# 최대 3개 연결만 허용하는 DB 풀
db_pool = DatabasePool(3)
# 10개의 워커가 동시에 DB 접근 시도
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(db_pool, i), name=f"Worker-{i}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
조건 변수 (Condition Variable)
특정 조건이 만족될 때까지 대기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import threading
import queue
class ProducerConsumer:
def __init__(self, max_size):
self.buffer = queue.Queue(maxsize=max_size)
self.condition = threading.Condition()
self.running = True
def producer(self, producer_id):
item_count = 0
while self.running:
with self.condition:
# 버퍼가 가득 찰 때까지 기다림
while self.buffer.full() and self.running:
print(f"Producer {producer_id}: 버퍼 가득참, 대기 중...")
self.condition.wait()
if not self.running:
break
item = f"item_{producer_id}_{item_count}"
self.buffer.put(item)
item_count += 1
print(f"Producer {producer_id}: {item} 생산")
# 소비자에게 알림
self.condition.notify_all()
time.sleep(0.5)
def consumer(self, consumer_id):
while self.running:
with self.condition:
# 버퍼가 비어있을 때까지 기다림
while self.buffer.empty() and self.running:
print(f"Consumer {consumer_id}: 버퍼 비어있음, 대기 중...")
self.condition.wait()
if not self.running:
break
item = self.buffer.get()
print(f"Consumer {consumer_id}: {item} 소비")
# 생산자에게 알림
self.condition.notify_all()
time.sleep(1)
# 사용 예시
pc = ProducerConsumer(5)
# 생산자 2개, 소비자 3개 생성
threads = []
for i in range(2):
thread = threading.Thread(target=pc.producer, args=(i,))
threads.append(thread)
thread.start()
for i in range(3):
thread = threading.Thread(target=pc.consumer, args=(i,))
threads.append(thread)
thread.start()
# 10초 후 종료
time.sleep(10)
pc.running = False
for thread in threads:
thread.join()
모니터 (Monitor)
클래스나 모듈 수준에서 동기화 제공
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Monitor:
def __init__(self):
self._lock = threading.RLock() # 재진입 가능한 락
self._conditions = {}
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
def wait(self, condition_name):
if condition_name not in self._conditions:
self._conditions[condition_name] = threading.Condition(self._lock)
self._conditions[condition_name].wait()
def signal(self, condition_name):
if condition_name in self._conditions:
self._conditions[condition_name].notify()
def signal_all(self, condition_name):
if condition_name in self._conditions:
self._conditions[condition_name].notify_all()
class BoundedBuffer:
def __init__(self, size):
self.buffer = []
self.size = size
self.monitor = Monitor()
def put(self, item):
with self.monitor:
while len(self.buffer) >= self.size:
self.monitor.wait('not_full')
self.buffer.append(item)
self.monitor.signal('not_empty')
def get(self):
with self.monitor:
while len(self.buffer) == 0:
self.monitor.wait('not_empty')
item = self.buffer.pop(0)
self.monitor.signal('not_full')
return item
교착상태 (Deadlock)
두 개 이상의 프로세스가 서로의 자원을 기다리며 무한히 대기하는 상태
교착상태 발생 조건
- 상호 배제 : 자원을 한 번에 하나의 프로세스만 사용
- 점유 대기 : 자원을 보유한 채 다른 자원 대기
- 비선점 : 자원을 강제로 빼앗을 수 없음
- 순환 대기 : 프로세스들이 순환적으로 자원 대기
교착상태 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time
# 교착상태 발생 예시
lock1 = threading.Lock()
lock2 = threading.Lock()
def worker1():
print("Worker1: lock1 획득 시도")
lock1.acquire()
print("Worker1: lock1 획득 완료")
time.sleep(1) # 다른 워커가 lock2를 획득할 시간을 줌
print("Worker1: lock2 획득 시도")
lock2.acquire() # 여기서 무한 대기
print("Worker1: lock2 획득 완료")
lock2.release()
lock1.release()
def worker2():
print("Worker2: lock2 획득 시도")
lock2.acquire()
print("Worker2: lock2 획득 완료")
time.sleep(1)
print("Worker2: lock1 획득 시도")
lock1.acquire() # 여기서 무한 대기
print("Worker2: lock1 획득 완료")
lock1.release()
lock2.release()
# 교착상태 발생!
# t1 = threading.Thread(target=worker1)
# t2 = threading.Thread(target=worker2)
# t1.start()
# t2.start()
교착상태 해결 방법
1. 예방 (Prevention)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 자원 순서 정하기로 순환 대기 방지
def safe_worker1():
# 항상 lock1 → lock2 순서로 획득
with lock1:
print("Worker1: lock1 획득")
time.sleep(1)
with lock2:
print("Worker1: lock2 획득")
# 작업 수행
def safe_worker2():
# 동일한 순서로 자원 획득
with lock1:
print("Worker2: lock1 획득")
time.sleep(1)
with lock2:
print("Worker2: lock2 획득")
# 작업 수행
2. 회피 (Avoidance) - 은행원 알고리즘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class BankersAlgorithm:
def __init__(self, processes, resources, allocation, max_need):
self.processes = processes
self.resources = resources
self.allocation = allocation
self.max_need = max_need
self.available = [resources[i] - sum(allocation[j][i]
for j in range(processes))
for i in range(len(resources))]
def is_safe_state(self):
work = self.available[:]
finish = [False] * self.processes
safe_sequence = []
for _ in range(self.processes):
found = False
for i in range(self.processes):
if not finish[i]:
need = [self.max_need[i][j] - self.allocation[i][j]
for j in range(len(self.resources))]
# 자원이 충분한지 확인
if all(need[j] <= work[j] for j in range(len(work))):
# 프로세스 완료 시뮬레이션
for j in range(len(work)):
work[j] += self.allocation[i][j]
finish[i] = True
safe_sequence.append(i)
found = True
break
if not found:
return False, []
return True, safe_sequence
def request_resource(self, process_id, request):
# 요청이 가능한지 확인
need = [self.max_need[process_id][j] - self.allocation[process_id][j]
for j in range(len(self.resources))]
if not all(request[j] <= need[j] for j in range(len(request))):
return False, "요청이 최대 필요량 초과"
if not all(request[j] <= self.available[j] for j in range(len(request))):
return False, "사용 가능한 자원 부족"
# 가상으로 할당해보기
for j in range(len(request)):
self.available[j] -= request[j]
self.allocation[process_id][j] += request[j]
# 안전 상태인지 확인
is_safe, sequence = self.is_safe_state()
if is_safe:
return True, f"안전 시퀀스: {sequence}"
else:
# 롤백
for j in range(len(request)):
self.available[j] += request[j]
self.allocation[process_id][j] -= request[j]
return False, "불안전 상태로 인한 거부"
# 사용 예시
banker = BankersAlgorithm(
processes=3,
resources=[10, 5, 7],
allocation=[[0, 1, 0], [2, 0, 0], [3, 0, 2]],
max_need=[[7, 5, 3], [3, 2, 2], [9, 0, 2]]
)
success, message = banker.request_resource(1, [1, 0, 2])
print(f"자원 요청 결과: {message}")
3. 탐지 및 복구 (Detection & Recovery)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class DeadlockDetector:
def __init__(self):
self.wait_for_graph = {} # 대기 그래프
def add_wait_relation(self, waiting_process, holding_process):
if waiting_process not in self.wait_for_graph:
self.wait_for_graph[waiting_process] = []
self.wait_for_graph[waiting_process].append(holding_process)
def has_cycle(self):
visited = set()
rec_stack = set()
def dfs(node):
if node in rec_stack:
return True # 순환 발견
if node in visited:
return False
visited.add(node)
rec_stack.add(node)
if node in self.wait_for_graph:
for neighbor in self.wait_for_graph[node]:
if dfs(neighbor):
return True
rec_stack.remove(node)
return False
for process in self.wait_for_graph:
if process not in visited:
if dfs(process):
return True
return False
def find_victim_process(self):
"""교착상태 해결을 위한 희생자 프로세스 선택"""
if not self.has_cycle():
return None
# 간단한 휴리스틱: 가장 적은 자원을 사용하는 프로세스
min_cost_process = min(self.wait_for_graph.keys())
return min_cost_process
# 사용 예시
detector = DeadlockDetector()
detector.add_wait_relation("P1", "P2")
detector.add_wait_relation("P2", "P3")
detector.add_wait_relation("P3", "P1") # 순환 형성
if detector.has_cycle():
victim = detector.find_victim_process()
print(f"교착상태 탐지! 희생자 프로세스: {victim}")
실제 프로그래밍에서의 동기화
Python asyncio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
class AsyncBankAccount:
def __init__(self, initial_balance):
self.balance = initial_balance
self.lock = asyncio.Lock()
async def withdraw(self, amount):
async with self.lock:
if self.balance >= amount:
# 비동기 I/O 시뮬레이션
await asyncio.sleep(0.1)
self.balance -= amount
return True
return False
async def deposit(self, amount):
async with self.lock:
await asyncio.sleep(0.1)
self.balance += amount
async def main():
account = AsyncBankAccount(1000)
# 동시 작업 실행
tasks = []
for i in range(5):
task = asyncio.create_task(account.withdraw(100))
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"최종 잔액: {account.balance}")
# asyncio.run(main())
JavaScript Promise와 동기화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class AsyncMutex {
constructor() {
this.locked = false;
this.queue = [];
}
async acquire() {
return new Promise((resolve) => {
if (!this.locked) {
this.locked = true;
resolve();
} else {
this.queue.push(resolve);
}
});
}
release() {
if (this.queue.length > 0) {
const resolve = this.queue.shift();
resolve();
} else {
this.locked = false;
}
}
}
class BankAccount {
constructor(initialBalance) {
this.balance = initialBalance;
this.mutex = new AsyncMutex();
}
async withdraw(amount) {
await this.mutex.acquire();
try {
if (this.balance >= amount) {
// 비동기 작업 시뮬레이션
await new Promise(resolve => setTimeout(resolve, 100));
this.balance -= amount;
return true;
}
return false;
} finally {
this.mutex.release();
}
}
}
// 사용 예시
const account = new BankAccount(1000);
Promise.all([
account.withdraw(100),
account.withdraw(200),
account.withdraw(150)
]).then(results => {
console.log('출금 결과:', results);
console.log('최종 잔액:', account.balance);
});
성능 고려사항
락 경합 최소화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 나쁜 예: 락 범위가 너무 큼
class BadExample:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def process_data(self, key, value):
with self.lock:
# 긴 계산 작업
result = expensive_computation(value)
# 파일 I/O
save_to_file(result)
# 데이터 저장
self.data[key] = result
# 좋은 예: 락 범위 최소화
class GoodExample:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def process_data(self, key, value):
# 락 밖에서 계산과 I/O 수행
result = expensive_computation(value)
save_to_file(result)
# 필요한 부분만 락으로 보호
with self.lock:
self.data[key] = result
락프리 프로그래밍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import queue
import threading
class LockFreeCounter:
def __init__(self):
self.value = 0
def increment(self):
# 원자적 연산 사용
while True:
current = self.value
new_value = current + 1
# Compare-And-Swap 시뮬레이션
if self.compare_and_swap(current, new_value):
break
def compare_and_swap(self, expected, new_value):
# 실제로는 하드웨어 지원 필요
if self.value == expected:
self.value = new_value
return True
return False
# 실제 Python에서는 queue.Queue 사용 권장
lock_free_queue = queue.Queue() # 내부적으로 락프리 구현
이 글은 저작권자의 CC BY 4.0 라이센스를 따릅니다.