#
8. Synchronization: Advanced
#
Review
#
Semaphore
- non-negative global integer synchronization variable. :
int s
; (\geq0) - Manipulated by P and V operations. : semaphore variable을 바꿀 수 있는 방법 p, s 를 통해 manipulate
- manipulate :
int var
를 증가, 감소시킴 (+복잡)
- manipulate :
- Binary semaphore vs Counting semaphore
- initialize as 1 (binary) 5, 10
- semaphore의 초기 value는 critical session에 들어갈 수 있는 thread 개수
- critical session에는 딱 하나 thread 밖에 못 들어감
- Counting semaphore는 그 값을 5로 초기화해두었다면 5개 cricial session에 들어갈 수있음
- critical session에 들어가고 나서 : P를 통해 들어갈지 안들어갈지 보고,
- 내가 들어갔다
- =
Semaphore val = 1
- critical session에서 작업하고 나올 때 누군가가 내가 아직 critical session에서 있을 때 들어오려고 하면 P에서 blocked (
semaphore val = 0
) = - \rightarrow blocked, suspended
- =
- 내가 나올 때 unlock
- →
V
fn 호출한다.
- →
- 내가 들어갔다
#
P(s)
- s라는 variable이 0인지 아닌지 봄
- **s \neq 0 : 0이 아니면 → 1을 빼고 나서 끝난다**
- If s is nonzero, then decrement s by 1 and return immediately.
- Test and decrement operations occur atomically (indivisibly)
- If s is nonzero, then decrement s by 1 and return immediately.
- s=0 : 본인이 suspend된다
- If s is zero, then suspend thread until s becomes nonzero
and the thread is restarted by a V operation.
- 내가 이 var보고나서 이 함수 안에서 여러 thread중 누군가가 decrement했다는 것
→ 나는 그 값이 0보다 큰 값이 될 때까지 기다릴 것이다 (suspend, sleep)
- until 0보다 큰 값이 될 때 : var x를 0보다 크게 만든 다음 깨워줄 때까지 기다림 (signal 받을 때까지)
- 내가 이 var보고나서 이 함수 안에서 여러 thread중 누군가가 decrement했다는 것
→ 나는 그 값이 0보다 큰 값이 될 때까지 기다릴 것이다 (suspend, sleep)
- After restarting, the P operation decrements s and returns control to the caller.
- → 깨어나게 되면, s라는 variable을 decrement하고 진행됨
- If s is zero, then suspend thread until s becomes nonzero
and the thread is restarted by a V operation.
- (binary semaphore기억하고 설명) Semaphore가 1로 initialize -> 두 thread가
P/V
- critical session을 앞 뒤로 쌓아서 lock을 가져다 lock을 얻으면 들어가고 얻지 못하면 기다림
- 나올 땐 lock을 풀어주는 구조
- 이
P
라는 것은 semaphore value를 가져다 확인하고, 그 값이 0이 아니라면 decrement - 최초로 1로 되있을 때 들어간다고 하면 decrement하여 0으로 만들고, 그리고 critical session으로 들어감
- 만약에 들어가려 했는데 semaphore 값을 봤는데 0이다
- → blocked 된 상태로 남아있고 그 semaphore에 나는 wait하고 있다며 매달려 있음
- Process state transition diagram에서 보면 run 상태에서 block상태로
- 정리
- semaphore s 를 Decrement한다
- s \neq 0
- 그 값이 0보다 크면 decrement,
- s=0
- sleep하고 누군가가 깨워줄 때까지 suspend된다.
- = CPU Cycle을 소모하지 않는다.
- → 즉 0보다 크거나 같은 값을 계속 체크하지 않고 suspended되어 있다가 signal오면 깨어나는 형태
#
V(s)
:
V
의 정의 : 기다리고 있는 thread들이 재시작되는 순서를 정의하지 않는다.- 요구하는 유일한 사항은
V
가 정확히 한 개의 waiting thread를 restart- → 여러 개의 thread가 하나의 Semaphore를 가지고 있을 때 어떤 것이 V의 결과로 재시작되는지는 예측 불가능
- 요구하는 유일한 사항은
- Increment
s
by 1- s라는 variable을 increment해줌
- → 그리고 나서 만일 s variable을 기다리고 있던 suspend thread가 있다면 그 중 한명을 깨워주고
- → 그 thread가 decrement할수 있게 해줌
- Increment operation occurs atomically :
- atomic하게 increment시키고 누군가가 s라는 variable을 decrement하지 못해서 pending되어있는 경우 (suspended된 경우),
- → 여러 thread중 누군가 하나를 wake시켜서 decrement할 수 있게끔 한다.
- atomic하게 increment시키고 누군가가 s라는 variable을 decrement하지 못해서 pending되어있는 경우 (suspended된 경우),
- 정리하면,
- S를 increment하고, 이 semaphore value에 기다리고 있던 blocked 된 thread를 깨워줌
- semaphorevalue s 를 increment
- → 이 semaphore value의 기다리고 있떤 blocked된 thread를 깨워 줌
- → 여러 thread가 다같이 깨어남 : thundering herd
- block되어 기다리던 thread를 깨워 줌
- → 깨어나 ready 상태로 가 scheduler에 의해 행사되어 critical session으로 들어감
- ‘Atomically’
- P,V는 atomically 수행된다
- indivisibly - 쪼개질 수 없는 함수이다.
- = p라는 함수는 실행이 되거나, 안 되거나 둘 중 하나이다.
- variable이 뭔지 test하고 이 variable을 어떤 값으로 setting해줌 (decrement)
- v또한 Atomically increment해주기 때문에 변수를 하나 증가해줌
- P,V는 atomically 수행된다
- If there are any threads blocked in a P operation waiting for s to become non- zero, then restart exactly one of those threads, which then completes its P operation by decrementing s.
- test와 감소 연산은 semaphore s가 0이 아니면 s의 감소가 중단 없이 일어난다는 의미에서 개별적으로 일어남. V에서도 증가연산 개별적.
- Semaphore를 중단 없이 load, 증가, 저장하기 때문
- Semaphore invariant: (s >= 0)
#
Using semaphores to protect shared resources via mutual exclusion
- 여러 thread가 해당 코드를 구동한다고 생각했을 때
mutex = 1
P(mutex)
cnt++ // Critical Session
V(mutex)
- Mutex 값은 초기에 1 → 0으로 바꾸어 주고 thread 1번 : block → waiting(blocked, suspended 상태)
- critical session에서 나와 mutex 에서 thread 2번이 기다리고 있고 이를 깨워준다.
- 다시한번 mutex를 잡으려고 한다
- → 잡혀 critical session으로 들어간다
- Basic idea:
- Associate a unique semaphore mutex, initially 1, with each shared variable (or related set of shared variables)
- Surround each access to the shared variable(s) with P(mutex) and V(mutex) operations
#
Using Semaphores to Coordinate Access to Shared Resources
- shared resources = data structure coordinate tech : semaphore
- Basic idea: Thread uses a semaphore operation to notify another thread that some condition has become true
- Use counting semaphores to keep track of resource state and to notify other threads
- Use mutex to protect access to resource
- Two classic examples:
- The Producer-Consumer Problem
- The Readers-Writers Problem
#
Producer-Consumer Problem
#
producer-consumer problem :
- 생산자와 소비자 (여러 명의 생산자) (여러 명의 소비자) - front와 rear가 만나면 empty buffer
- classical한, 상당히 잘 사용되는 concurrency example.
- 생산자 : 무언가 item을 생산하고 있음 (Insert)
- item store @ Shared buffer (multiple slot)
- 생산자는 slot에다가 앞에서부터 하나씩 차곡차곡 쌓는다.
- 내가 어디까지 쌓았는지 확인하기 위해 circulared buffer : front/rear
- front는 아직 소비하지 않았기 때문에 맨 처음
- 소비자. 무언가 item을 소비하고 있음 (Remove)
- remove하게 되면 front 뒤로 이동 (상황 가정) 3명의 consumer가 있고 buffer가 empty되어 있다고 하자.
- 소비자는 살 수 있는 item이 있는가?
-> empty buffer이기 때문에 없다.
- 소비자는 item이 없으면 sleep하면 된다 (blocked되면 된다)
- 생산자 입장에서는 소비자가 소비 못하는 바람에 buffer가 없어 더 생산 불가능 -> 생산자는 그 때 sleep을 한다. 이런 특성을 가진 구조 semaphore :
- 내가 cs에 들어갈 수 있을 때
#
Concepts
big data system에서 backend에서 개발할 수 있는 중요한 technique
- Common synchronization pattern:
- Producer waits for empty slot, inserts item in buffer, and notifies consumer
- producer는 empty slot을 block된 상태로 기다림 (buffer 에 item insert위해서)
- producer는 buffer에 item을 넣고, semaphore 기다리는 consumer가 있다면 알려줌
- empty slot이 있다 → 생산자는 생산하면 되고
- Consumer waits for item, removes it from buffer, and notifies producer
- consumer : item 있는지 보고 있으면 buffer에서 remove하고 producer에게 알려서 새로 생산해도 된다고 하여 produce
- item이 buffer에 있다면 -> 소비하면 되고
- item이 없다면 -> producer가 produce할때까지 sleep
- Producer waits for empty slot, inserts item in buffer, and notifies consumer
semaphore의 또 다른 용도 : mutual exclusion 제공 + shared resource로의 접근 scheduling
- thread : semaphore 연산을 이용하여 program state의 조건이 true가 됐음을 다른 Thread에 알림
- Examples
- Multimedia processing:
- Producer creates MPEG video frames, consumer renders them
- coding algorithm에 따라 나오는 data
- Consumer thread가 렌더링
- 분배 : mpeg frame 생성 / consumer 렌더링 (분배)
- buffer :개별 Frame에서 data와 관련한 encoding, decoding 시간 차이로 발생한 video stream noise 제거 → producer에 slot 저장소 제공, consumer에게 encoded frame 저장소 제공
- Producer creates MPEG video frames, consumer renders them
- Event-driven graphical user interfaces
- Producer detects mouse clicks, mouse movements, and keyboard hits and inserts corresponding events in buffer 5 Shared buffer
- Consumer retrieves events from buffer and paints the display
- 마우스, 키보드 클릭 등 event를 받아 event buffer에 마구 집어넣는 producer -> 화면에다 display해줌
- Multimedia processing:
#
Producer-Consumer on an n-element Buffer
item을 추가하고 제거하는 것이 shared variable의 renew와 관련되어 있으므로 buffer에 접근할 때 mutex 보장 + buffer로의 접근 scheduling
- if full buffer → producer 는 slot available 할 때까지 대기
- if empty buffer → consumer 는 item available 할 때까지 대기
Requires a mutex and two counting semaphores (slot/item):
- mutex: enforces mutually exclusive access to the the buffer
- mutex : binary semaphore :0, 1
- mutually exclusive 오로지 하나 thread만 들어가도록
- mutex : binary semaphore :0, 1
- slot, items : counting semaphore
- 초기화 값에 따라 그만큼 들어갈 수 있음.
- slots: counts the available slots in the buffer
- slot : n개의 buffer라고 할 때
- ex. n=8이라고 하면 slot semaphore는 8로 초기화되어 8개까지는 동시에 CS에 들어가서 produce할 수 있다.
- slot : n개의 buffer라고 할 때
- items: counts the available items in the buffer
item : buffer 안 item의 개수
처음에는 empty였으니까 0으로 initialize
sbuf라는 shared buffer
- mutex: enforces mutually exclusive access to the the buffer
Implemented using a shared buffer package called S_{BUF}.
- limited buffer 조작
- mutex semaphore : mutex buffer approach 제공
- semaphore slot, item : counting semaphore - empty slot의 수, available item 수 확인
#
sbuf
Package
#
Declarations
// #include "csapp.h”
typedef struct
{
int *buf;/* Buffer array */
int n;/* Maximum number of slots */
int front;/* buf[(front+1)%n] is first item */
int rear;/* buf[rear%n] is last item */
sem_t mutex;/* Protects accesses to buf */
sem_t slots;/* Counts available items */
sem_t items;/* Counts available slots */
} sbuf_t;
void sbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *sp);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);
#
Implementation
/* Create an empty, bounded, shared FIFO buffer with n slots */
void sbuf_init(sbuf_t *sp, int n)
{
sp->buf = Calloc(n, sizeof(int));
sp->n = n;/* Buffer holds max of n items */
sp->front = sp->rear = 0;/* Empty buffer iff front == rear */
Sem_init(&sp->mutex, 0, 1);/* Binary semaphore for locking */
Sem_init(&sp->slots, 0, n);/* Initially, buf has n empty slots */
Sem_init(&sp->items, 0, 0);/* Initially, buf has 0 items */
/* Clean up buffer sp */
}
// Clean up buffer sup
void sbuf_deinit(sbuf_t * sp) {
Free(sp->buf);
}
sbuf_init
- buffer를 위한 Heap memory 할당
- Front - rear가 empty heap을 가리키도록 설정
- 초기값을 semaphore에 할당
- 세 함수로 호출하기 전 한 번 호출
- 0으로 초기화 = 0개만큼 들어간다는 이야기는 incorrect
- Item은 있으면 thread에 넣는다.
sbuf_deinit
- 사용 마친 후 buffer space 반환
- ! (Semaphore 초기값) \neq (critical session에 들어가는 thread의 개수)
- binary semaphore : 1개 thread만
- counting semaphore : Buffer에 있는 element 개수만큼 생산할수 있기 때문에
/* Insert item onto the rear of shared buffer sp */
void sbuf_insert(sbuf_t *sp, int item)
{
P(&sp->slots); /* Wait for available slot */
P(&sp->mutex); /* Lock the buffer */
sp->buf[(++sp->rear) % (sp->n)] = item; /* Insert the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->items); /* Announce available item */
}
- Insert할 때 (thread들어갈 때) slot의 개수를 본다
sbuf_insert
- available slot을 기다리고, Mutex lock
- Item insertion 후에 Mutex unlock한 후 new itemd이 available함을 알림
- ex. N=8
- → decrement : n=7
- 언제 block되냐 : 값이 0이 되어있을 때
- 8개가 다 들어가 있다 = slot 이 없다
- Insert 여러가지 thread가 떠서 insert를 하고 있기 때문에, 그 안에는 slot이 0이 될 때까지 mutex로 잡아 둠 = item을 잡아둘 때 rear ptr를 여러 thread가 움직이면 안됨
- producer도 하나씩 순차적으로 수행해야 함,
- 직렬화를 위해 mutex lock을 가지고 하나만 실행할 수 있도록.
- item이 생산되었으니 increment → 1 또는 2, 3, 4
- consumer: consume하기 전에 item에 해당하는 semaphore value를 봄
- 0이 아니면 decrement하여 들어감
- → decrement : n=7
/* Remove and return the first item from buffer sp */
int sbuf_remove(sbuf_t *sp)
{
int item;
P(&sp->items);/* Wait for available item */
P(&sp->mutex);/* Lock the buffer */
item = sp->buf[(++sp->front) % (sp->n)]; /* Remove the item */
V(&sp->mutex);/* Unlock the buffer */
V(&sp->slots);/* Announce available slot */
}
return item;
- Writer (producer) - reader (consumer)
- P(slot) / P(mutex) / CS / V(mutex) / V(item) → item을 통해 reader(consumer)중 p(item)
sbuf_remove
- item 기다린 후 mutex lock
- item : buffer front에서 remove
- mutex unlock후 available slot임을 알림
- remove
- item을 가지고 들어가고, 나올 때는 slot을 봄
- item이 0이 아니면 consume은 여러개가 같이 할 수 있음
- front에서 item을 하나 씩 빼내면 ptr을 옮기고 직렬화가 됨
→ mutex semaphore를 잡아 한 녀석 한 번에 처리할 수 있도록 → item을 뺐으니 slot이 생산
#
Readers-Writers Problem
- Readers- Writers Problem
- database = record 관리 : index를 tree로 관리
- 자료구조를 update할수도 reference할수도 있음
- database안에서는 이 문제를 해결하기 위한 solution으로 semaphore / mutex
#
Concepts
- Generalization of the mutual exclusion problem
- concurrent thread의 set : shared object에 approach
- ex. main memory data structure, disk database
- Problem statement:
- Reader threads only read the object / Writer threads modify the object
- Writers must have exclusive access to the object - but reader는 이 object를 무수히 다른 reader과 공유해야 할 수 있음
- Unlimited number of readers can access the object
- 어떤 object가 있는데 이 object가 read/write 가능함
- 누군가 read하고 있으면 해당 object를 write하면 안 되고 기다림
- “누군가가 읽고 있다” - 쓰려는 애들은 기다려야 하지만 읽고자 하는 obj는 읽게 해줌
- “누군가가 쓰고 있다”
- 누군가 write하고 있으면 read는 대기중
- 누군가 read : 다른 read가능 (누군가가 수정하고 있는 상태가 아니기 때문에)
- 누군가 read하고 있으면 해당 object를 write하면 안 되고 기다림
- Reader threads only read the object / Writer threads modify the object
- Occurs frequently in real systems, e.g.,
- Online airline reservation system
- 고객 : 좌석 할당 시스템을 동시에 조사 가능
- 예약 : Database에 배타적 접근
- Multithreaded caching Web proxy
- 무제한 Thread : 기존 page를 공유 page cache에서 가져올 수 있음
- cache에 새 페이지를 쓰는 모든 Thread에 배타적 접근
- Online airline reservation system
#
Variants of Readers-Writers
- First readers-writers problem (favors readers)
- No reader should be kept waiting unless a writer has already been granted permission to use the object 어떠한 reader도 writer가 이미 이 객체르 이용하도록 허가하지 않았으면 계속 기다려서는 안된다
- A reader that arrives after a waiting writer gets priority over the writer reader는 단지 writer가 기다리고 있다고 해서 기다려서는 안 된다.
- Favor for reader
- 어떤 object가 있다고 할 때 R1이 읽고 있다고 가정하자 → W1이 오면, W1는 wait하여야 한다 : 누군가가 read하고 있기 때문에 → R2가 오면 : R2는 wait없이 읽어도 된다. (Read에 favor)
- w1을 bypass하고 r2가 읽을 수 있도록 하고, r3…가 와도 읽을 수 있도록 한다.
→ write가 먼저 왔음에도 불구하고 현재 read를 하고 있으면 동일한 object에 대한 read는 계속 가능케 해 준다.
- w1 : starvation (Ri는 계속해서 read 가능하고, 그 중 절대 들어가지 못함)
- Second readers-writers problem (favors writers)
- Once a writer is ready to write, it performs its write as soon as possible writer를 쓸 준비가 되었다면 가능한 한 빨리 write작업 수행할 것 요구
- A reader that arrives after a writer must wait, even if the writer is also waiting 비록 writer가 기다리고 있을지라도 기다려야 함.
- Favor for Writer
- Write가 왔다 = write를 빠르게 처리해야 하기 때문에 뒤에 있는 녀석들을 처리하지 않는다 → writer가 끝난 다음에 reader를 처리한다 (최대한 빠르게 writer를 처리한다) → write에게 favor준다 하더라도 reader가 들어가서 안나오면 starvation
- Or, writer가 들어가서 나오지 않으면 reader는 starvation
- 사실 누구에게나 starvation이 발생하고 해결할 수 있는 solution이 아님
- Starvation (where a thread waits indefinitely) is possible in both cases
- 영원히 정지하고 진행하지 못하게 되는 thread
#
Solution to First Readers-Writers Problem
- starvation에 대한 solution을 제공하지 않음
- reader가 어느정도 이상이 되면 막는 coding : reader가 무한대로 들어오면 안 됨
- Starvation을 막기 위한 코드는 어떻게 구현하여야 하는가
- → 일정 threshold 까지 들어가게 되면 writer가 한 번 정도 들어가도록 수정
**w semaphore
: shared obj 접근하는 critical section으로의 접근 제어****mutex semaphore
: shared readcnt var로의 접근 보호**- 현재 critical section에 있는 reader수 count
**writer
: w mutex critical section에 들어갈 때마다 lock, 떠나면 unlock**- → critical section에 최대 하나의 Writer만 존재하도록 보장
- critical section에 처음으로 들어가는 reader만 w lock, critical section을 가장 마지막으로 떠나는 reader만이 이를 unlock하여 풀어준다 : 한 개의 reader가 W mutex를 가지고 있는 한 무수한 Reader가 critical section에 아무 방해 없이 들어갈 수 있음을 의미
- w가 1인 것은 : Reader, writer 둘 중 하나만 들어가야 함 대신 reader가 들어가면 여러개가 들어갈 수 있음
- if condition이 없으면 Reader 들어가거나 writer들어가거나 → 약간 extension하면 reader에게 favor → reader가 계속 들어갈 수 있어서, 조건문을 통해 read가 한 번 들어가 있다고 하면 그제서야 semaphore가지고 들어가고 Readcnt가 1보다 크게 되면 semaphore를 하지 않음
int readcnt; /* Initially = 0 */
sem_t mutex, w; /* Initially = 1 */
void reader(void)
{
while (1)
{
P(&mutex);
readcnt++;
if (readcnt == 1) /* First in */
P(&w);
V(&mutex); /* Critical section */ /* Reading happens */
P(&mutex);
readcnt--;
if (readcnt == 0) /* Last out */
V(&w);
V(&mutex);
}
}
void writer(void)
{
while (1)
{
P(&w); /* Critical section */ /* Writing happens */
V(&w);
}
}
- (질문) packet loss 발생 시 문제?
-> 문제 안됨
- socket 통신 : socket열어서 send receive information API 밑단 TCP IP Kernel에서 보정해줌
- packet loss 가 생기더라도 tcp쓴다고 하면 kernel의 tcp ip stack에서 protocol이 다 보정함 -> 잃어버리게 되면 다시 data 달라고 하여 줌
#
Prethread
#
Putting It All Together: Prethreaded Concurrent Server
- server : main thread - multiple work thread
- main thread : 반복해서 client accept request → 연결 식별자를 buffer에 저장
- work thread : 반복해서 remove descriptors, client service 이후 descriptor 기다림
- Thread pooling : thread를 만들어 놓고, 필요한 thread만 wake하여 사용하는 방식
- Thread를 미리 만들어 놓고 안 쓰는 thread는 sleep
- 지금까지는 connfd나오면 thread spawn한 다음 해당 thread가 connfd와 client끼리 connect하여 channel 로 echoing - 여기서는 Pool of worker thread
- Shared buffer : master thread가 accept할 때마다 connect descriptor
- master thread가 accept할 때마다 connected descriptor를 하나 씩 채워줌 (P=S model)
#
Prethreaded Concurrent Server Configuration
Prethreaded concurrent echo server : sbuf로 구현
//echoservert_pre.c
sbuf_t sbuf;
/* Shared buffer of connected descriptors */
int main(int argc, char **argv)
{
int i, listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;
listenfd = Open_listenfd(argv[1]);
sbuf_init(&sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++) /* Create worker threads */
Pthread_create(&tid, NULL, thread, NULL);
while (1)
{
clientlen = sizeof(struct sockaddr_storage);
connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd); /* Insert connfd in buffer */
}
}
//*tuning parameter
// Concurrency degree - work load, application에 따라 다름
// 너무 작게 -> 일할 thread가 없음 : memory에 있는 자료구조 소비
// Lockdb, Level db : buffering 한 다음 flush / - flush할
// worker thread를 만들어 놓고, connfd만 return하고
// sbuf package를 통해 insertion하여 buffer에 집어넣음
void *thread(void vargp)
{
Pthread_detach(pthread_self());
while (1)
{
int connfd = sbuf_remove(&sbuf);
/ Remove connfd from buf / echo_cnt(connfd);
/ Service client * / Close(connfd);
}
}
echoservert_pre.c 16
- thread란 함수는 뒤에서 main thread가 joinable할 필요 없으면 Pthread_Detach
- 자기는 joinable하고 While에 들어가 Buffer produce에서 Connfd를 하나씩 뺀다
- worker thread 하나씩 빼고 connfd를 주되, buffer에다 집어넣었으니까 하나씩 빼내어감
- sbuf remove, sbuf insert : 앞의 함수를 사용하면 됨
// echo_cnt.c
static int byte_cnt; /* Byte counter */
static sem_t mutex; /* and the mutex that protects it */
static void init_echo_cnt(void)
{
Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
- Sem_init(&mutex, 0, 1);
- mutex 초기화 하고 byte count - 총 echoing한 byte count 보내는 코드
- 여러 thread가 echoing했으니까 내가 echo할 때마다 몇 byte echoing했는지 shared variable로 global하게 잡아서 byte count에 increment
void echo_cnt(int connfd)
{
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT; // static global
Pthread_once(&once, init_echo_cnt); //thread 여러개 뜨더라도 초기화 1회
Rio_readinitb(&rio, connfd); //connfd에서 buffer을 받아옴
while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0)
//bytecount : 여러 개 동시 Update하면 안 되니까 Mutex lock 잡아 둠
{
P(&mutex);
byte_cnt += n;
printf("thread %d received %d (%d total) bytes on fd %d\n", (int)pthread_self(), n, byte_cnt, connfd);
V(&mutex);
Rio_writen(connfd, buf, n);
}
}
#
Crucial concept: Thread Safety
thread safe하다의 의미가 무엇인지 개념적으로 배워보자.
- 모든 함수가 safe하지도 unsafe하지도 않다.
- Functions called from a thread must be thread-safe : A function is thread-safe thread에서 호출된 함수가 thread safe하다 ****\iff it will always produce correct results when called repeatedly from multiple concurrent threads 여러 thread가 concurrently 호출하고, 각 thread들이 excution flow에 따라 항상 correct한 결과가 나온다.
- correct한 것이란: 추상적 개념으로 thread safe :
내가 expect한 결과대로 나오면 safe한데
expected 결과로 나오지 않는다면 unsafe
- expected : thread-safe : Thread가 그 함수를 호출했을 때 thread-safe하다.
- Classes of thread-unsafe functions:
- Class 1: Functions that do not protect shared variables →lock variable
- Class 2: Functions that keep state across multiple invocations → rand var
- Class 3: Functions that return a pointer to a static variable → thread safe하게 바꾸는 기술 알려줄 것 : lock n copy
- Class 4: Functions that call thread-unsafe functions
- 대부분 여러 system call은 thread safe하게 구현되어 있으니 thread unsafe한 함수가 이렇게 존재한다.
- thread safe하게 바꾼 것이 rand_r, ctime_r이라던지 thread safe하게 만들었다.
- 그러나 unix programming하면은 thread-unsafe한 함수를 쓴다.
- 옛날 multiprocessing하려고 할 때 single processor, serialized된 상태였다.
- 옛날의 함수를 지금 사용하려고 하니까 오류
- 요즘과 같은 multi processor 시대에서는 thread unsafe하네 라는 문제들이 왕왕 생기게 되어 thread safe된 함수를 쓰라고 되어 있다.
- system call을 변환한 것들이 있는데 개발자로서 ctime과 비슷한 함수를 만들었다고 하면 thread safe한지, thread unsafe한지 확인
- thread unsafe하면 multi threading할 때 문제가 생길 수 있음을 인지하기 시작했으므로 ex. 격투기에 쓰이는 여러 가지 기술처럼 프로그래밍도 처음부터 짜는 것이 아닌, 자신만의 기술을 가지고 있어야 함 - 간헐적으로 발생하여 debug하기가 굉장히 어렵다.
- Thread-Safe하지 않는 function의 특징 네가지(case를 이해하면 쓰기 어렵지 않음)
- 기억해야 하는 이유 : 정보처리기사 자격증에서 외워 쓰는 것이 아니라, 실제 프로그래밍할 때 이런 규칙들이 있다 보니 신경써야 한다. 개발자가 할 부분이고, 그럼에도 실수하기 때문이다. bug free한 프로그램 (질문) 내가 한 번 호출하면 1이고 100이고 200이고. seed값이 변하지 않으면 정해진 sequence에 대하여 random : seed값이 바뀌지 않으면 항상 같은 결과값이 나온다. rnadom한 pattern이 동일하게 나옴을 보장할 수 없다. → seed를 누군가가 바꿀수도 있는데 이를 못 바꾸게 해 달라는 것. 동일하게 생성되어야 하는데 그렇지 않은 경우
#
Thread-Unsafe Functions (Class 1)
- Failing to protect shared variables
- Fix: Use P and V semaphore operations
- Example: goodcnt.c
- Issue: Synchronization operations will slow down code
- goodcnt : shared var이 있는데 count하는 var이고 초기화가 0으로 되어 있고 0으로 된 이 thread var을 increment
- 한 번씩 increment했으니까 2가 되어야 함 (expected, thread safe)
- count increment하는 함수가 thread safe하지 않다 (thread unsafe)
- binary semaphore하여 mutex lock을 걸어 앞 뒤를 보호했다.
- thread safe한 함수를 만들게 되면, 여러 thread가 increment할 때 cnt++하는 과정이
- 세 가지 instruction으로 구성되어 있는데 중간에 interleaving되는 경우가 나오지 않아 thread safe하여 항상 2가 나온다.
- mutex lock 사용: thread safe
- mutex lock 비사용: thread unsafe
#
Thread-Unsafe Functions (Class 2)
Relying on persistent state across multiple function invocations
Example: Random number generator that relies on static state
-
- ex. Pseudo random integer generator
- 매번 random함수를 호출하는데 state를 계속 keep해야 한다.
- Global var을 적게 사용하고 Call by ref로 들어가면 다른 thread에 unavailable
- → 없애는 방법을 서라. global variable을 최대한 피해라.
static unsigned int next = 1; /* rand: return pseudo-random integer on 0..32767 */ int rand(void) { next = next1103515245 + 12345; return (unsigned int)(next / 65536) % 32768; } /* srand: set seed for rand() */ void srand(unsigned int seed) { next = seed; }
- next를 static하게 공유하게 해 놓는다.
- → deterministic하게 시작한 처음의 seed가 1이니까 순서대로 나온다. (매 invocation마다 state를 keep하는 형태로 구현)
- seed를 바꾸는 srand함수
- 문제가 되는 이유: 임의의 번호 1 3 5 7 9 생산한다고 가정
- seed가 1부터 시작, 2씩 더하여 리턴하는 함수 srand
- 내가 state를 가지고 있으니 다음, 다음 다음, 넘어간다. thread가 중간에 들어가 next를 갑자기 3에서 3000으로 바꾸어 버리면 Rand를 다시 시행한다 하더라도 그 sequence를 다시 생성할 수 없다.
- correct하지 않다. / thread safe하지 않다.
- 왜 이렇게 프로그램을 짰는가? 모든 것을 thread safe하게 만들면 되는게 아닌가? 의문
-
#
Thread-Safe Random Number Generator
Rand에서는 parameter로 void로 넘어갔지만
nextp pointer : 내 seed와 공유되는 변수가 아닌 것
- Rand_r caller에서 선언한 local var의 ptr를 넘겨주기 때문에 자신 seed가지고 increment
- → thread unsafe한 것을 safe하게 바꾸어 줄 수 있다.
- → arg 의 일부로 state를 넘겨라
Pass state as part of argument / arg 의 일부로 state를 넘겨라
and, thereby, eliminate global state
- next는 thread들이 모두 호출할 수 있다.
- 여러 thread들이 각자 공유하고 있기 때문에 생기는 문제
- → global state가 발생하지 않도록 만드는 방법
/* rand_r - return pseudo-random integer on 0..32767 */ int rand_r(int *nextp) { *nextp = *nextp * 1103515245 + 12345; return (unsigned int)(*nextp / 65536) % 32768; }
- Consequence:programmer using rand_r must maintain seed
#
Thread-Unsafe Functions (Class 3)
- Returning a pointer to a static variable
- Fix 1. Rewrite function so caller passes address of variable to store result
- Requires changes in caller and callee
- Fix 2. Lock-and-copy
lock을 잡고 copy해라
- Requires simple changes in caller (and none in callee)
- However, caller must free memory.
/* lock-and-copy version */
char *ctime_ts(const time_t *timep, char *privatep)
{
char *sharedp;
P(&mutex);
sharedp = ctime(timep);
strcpy(privatep, sharedp);
V(&mutex);
return privatep;
}
lock and copy
ctime :
현재의 시간을 가져다 return해주는 function (from time_t struct)- global variable로 되어 있는데 이를 읽어 오는 것 (by system call)
- ctime을 수행하고 읽어오는 사이에 우연치 않게 interrupt 걸려 switch된다면
- → 내가 호출한 시점이 아닌 다음 시점에 ctime으로 내용이 바뀔 수 있다. 나는 모르고 interrupted 된 시간으로 return받을 수 있다.
- (ctime 자체가 기지고 있는 문제) 시간이 흘르며 s1→e1 도중 s2→e2에서 time이 들어간다.
- 함수 자체가 문제 있음 → lock : breakable하지 않게, atomic하게 잡아 중간에 누군가가 interleaving하여 들어오지 않게 함.
- lock을 잡고 return되면 copy하여 줌 : lock을 잡았기 때문에 이를 풀어야지 들어갈 수 있음
- private으로 받되, caller의 ptr로 받는다면 자기 것으로 받는다
- ptr를 받은 후 strcpy하는데 caller의 변수 stack에 있는 var의 address를 넘겨주니까
이를 dereference하며 lock and copy를 수행함
- fix1 : ctime을 다 뜯어 고칠수 있다
- fix2 : ctime을 그대로 쓰되 mutex lock : ctime_ts를 부르는 caller가 있으니가, 해당 caller는 call by reference를 그대로 받을 수 있다.
#
Thread-Unsafe Functions (Class 4)
- Calling thread-unsafe functions
- Calling one thread-unsafe function makes the entire function that calls it thread-unsafe
- Fix: Modify the function so it calls only thread-safe functions
#
Reentrant Functions
- Def: A function is reentrant \iff it accesses no shared variables when called by multiple threads.
지금 봤던 것은 : function들은 unsafe하냐, safe하냐로 나누어짐
- thread safe 중 어떤 것들은 reentrant :
- thread가 호출했을 때 shared variable을 Thread가 같이 접근하지만 safe하지 않을 때 safe하게 만들기: 피해나가는 방법.
- 그 shared variable이 없는 것 <책에서 없는 내용>
- safe, not reentrant, safe, reentrant
- unsafe를 safe하게는 만들어주는 능력은 필요하다.
- (unsafe, reentrant)는 큰 의미는 없다.
- Important subset of thread-safe functions
- Require no synchronization operations
- Only way to make a Class 2 function thread-safe is to make it reetnrant (e.g.,
rand_r
)
#
Thread-Safe Library Functions
- All functions in the Standard C Library (at the back of your K&R text) are thread-safe
- Examples:
malloc, free, printf, scanf
- printf는 thread-safe하긴 하다 (여러 thread 동시접근시) / but not async-signal-safe → NOT reentrant
- Examples:
- Most Unix system calls are thread-safe, with a few exceptions:
#
One worry: Races
- A race occurs when correctness of the program depends on one thread reaching point x before another thread reaches point y
- thread라는 fn 실행하여 call by reference로 해서 들어와서, I는 main thread의 local variable
- -> 주소값을 넘김
- → 새로 생성된 thread는 그 주소값을 myid라는 local variable을 stack에 copy하고 print해줌
- If) n=2 → iteration은 2번돈다.
// race.c
/* A threaded program with a race */
int main()
{
pthread_t tid[N];
int i; //N threads are sharing i
for (i = 0; i < N; i++)
Pthread_create(&tid[i], NULL, thread, &i);
for (i = 0; i < N; i++)
Pthread_join(tid[i], NULL);
exit(0);
}
/* Thread routine */
void *thread(void *vargp)
{
int myid = *((int *)vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}
#
Race Illustration
- i라는 것을 dereference하여 print하려고 프로그램을 작성했는데 myid가 1이 나올수도 있다
- → myid가 0인줄 알고 실행했는데 i=1로 증가시키면 그 주소값을 보고 있기 때문에 0이 아닌 1이 print된다
- 이런 상황이 존재하면 thread간 race가 있다.
- 잘못된 상황이 발생하면
for (i = 0; i < N; i++)
Pthread_create(&tid[i], NULL, thread, &i);
- between increment of
i
in main thread - dereference ofvargp
in peer thread- If dereference happens while i = 0, then OK
- Otherwise, peer thread gets wrong id value
#
Could this race really occur?
- Race Test
- If no race, then each thread would get different value of i
- Set of saved values would consist of one copy each of 0 through 99
int i;
for (i = 0; i < 100; i++){
Pthread_create(&tid, NULL, thread, &i);
}
void *thread(void *vargp)
{
Pthread_detach(pthread_self());
int i = *((int *)vargp);
save_value(i);
return NULL;
}
race.c
#
Experimental Results
thread for iteration하여 100번 실행 → ideally, 0~99번까지 원래는 한 번씩 찍히는 게 맞다. 그러나 race가 있기 때문에
- single cpu laptop에서 구동시켰더니
- 1, 8, 16, 42가 두 번 찍힌다 = 한 번도 찍히지 않은 것들이 생긴다
- cpu에 hyper threading까지 되어 virtual cpu 8개 까지 수행됨 (3)
- 안찍히는 게 너무 많다.
- single cpu에서 multi cpu server로 시켰더니 cpu 개수가 많이자게 되면,control
- multicore cpu.arcitecture을 설계하게 되면 나중에
#
Race Elimination
Avoid unintended sharing of s tate
- 원래는 main 함수에다가 heap variable을 잡아 두고 thread를 가져다 각각의 ptr를 넘겨준다
- 각각에 대해서, heap variable을 보고 print하기 때문에 항상 한 번씩만 찍히게 된다.
- i를 가지고 넘기게 되면 iteration이 빨리 돌아올 수 있기 때문에
- heap var : 각 thread가 각자 들어가기 때문에 문제 해결 가능.
/* Threaded program without the race */
int main()
{
pthread_t tid[N];
int i, *ptr;
for (i = 0; i < N; i++)
{
ptr = Malloc(sizeof(int));
ptr = i;
Pthread_create(&tid[i], NULL, thread, ptr);
}
for (i = 0; i < N; i++)
Pthread_join(tid[i], NULL);
exit(0);
}
/* Thread routine */
void *thread(void *vargp)
{
int myid = *((int *)vargp);
Free(vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}
#
Another worry: Deadlock
- Def: A process is deadlocked \iff it is waiting for a condition that will never be true
- 서로서로 기다리는 상태에서 전진하지 못하고 hanging하는 상태
- Typical Scenario
- Processes 1 and 2 needs two resources (A and B) to proceed / thread 1, 2번이 있는데 Process 1 acquires A, waits for B / t1은 a라는 자원을 가진 후 b라는 자원을 가져야 진행할 수 있다. Process 2 acquires B, waits for A / t2은 b라는 자원을 가진 후 a라는 자원을 가져야 진행할 수 있다. 공교롭게도 a,b를 각각 잡은 상태에서 t1은 b를 기다리고 t2는 a를 기다린다.
- Both will wait forever!
- 둘다 rsrc를 획득하려고 하는데 서로의 rsrc를 기다리고 있는 이런 상태라고 보면 된다.
- 두 thread가 hanging하여 진행하지 못함.
#
Deadlocking With Semaphores
- concurrent programming 기술 중 하나
- mutex한거 같은데 왜 hanging하고 있지 할 때 solution
- Deadlock 해결 → 같은 ordering을 주게 되면 deadlock을 해결할 수 있음
- t1 : a -> b / t2 : b -> a 에서 a->b
- 같은 순서로 획득할 수 있게 하면 deadlock을 회피할 수 있다
int main()
{
pthread_t tid[2];
Sem_init(&mutex[0], 0, 1); /* mutex[0] = 1 */
Sem_init(&mutex[1], 0, 1); /* mutex[1] = 1 */
Pthread_create(&tid[0], NULL, count, (void)0);
Pthread_create(&tid[1], NULL, count, (void *)1);
Pthread_join(tid[0], NULL);
Pthread_join(tid[1], NULL);
printf("cnt=%d\n", cnt);
exit(0);
}
void *count(void *vargp)
{
int i;
int id = (int)vargp;
for (i = 0; i < NITERS; i++)
{
P(&mutex[id]);
P(&mutex[1 - id]);
cnt++;
V(&mutex[id]);
V(&mutex[1 - id]);
}
return NULL;
}
Tid[0] :
P(s0);
P(s1);
cnt++;
V(s0);
V(s1);
Tid[1] :
P(s1);
P(s0);
cnt++;
V(s1);
V(s0);
#
Deadlock Visualized in Progress Graph
- Locking introduces the potential for deadlock: waiting for a condition that will never be true
- Any trajectory that enters the deadlock region will eventually reach the deadlock state, waiting for either s0 or s1 to become nonzero
- deadlock region : overlap되는 구간을 반드시 지나야 함
- Other trajectories luck out and skirt the deadlock region
- Unfortunate fact: deadlock is often nondeterministic (race)
#
Avoiding Deadlock Acquire shared resources in same order
int main()
{
pthread_t tid[2];
Sem_init(&mutex[0], 0, 1); /* mutex[0] = 1 */
Sem_init(&mutex[1], 0, 1); /* mutex[1] = 1 */
Pthread_create(&tid[0], NULL, count, (void *)0);
Pthread_create(&tid[1], NULL, count, (void *)1);
Pthread_join(tid[0], NULL);
Pthread_join(tid[1], NULL);
printf("cnt=%d\n", cnt);
exit(0);
}
void *count(void *vargp)
{
int i;
int id = (int)vargp;
for (i = 0; i < NITERS; i++)
{
P(&mutex[0]);
P(&mutex[1]);
cnt++;
V(&mutex[id]);
V(&mutex[1 - id]);
}
return NULL;
}
Tid[0] :
P(s0);
P(s1);
cnt++;
V(s0);
V(s1);
Tid[1] :
P(s0);
P(s1);
cnt++;
V(s1);
V(s0);
#
Avoided Deadlock in Progress Graph
- No way for trajectory to get stuck
- Processes acquire locks in same order
- Order in which locks released immaterial
- 순서를 바꾸어 수행하게 되면 → forbid region이 변화함
- overlap되는 부분을 들어가지 않으면 됨.
- (여담) 김영재 교수님석사 시절,
- thread가 여러개 돌아가는데 서로 msg 주고 받는 프로그램 → 정말 간헐적으로 생김
- 내가 짠 sw위에 benchmark를 돌리는데, 어떤 complexity로 돌리느냐에 따라서 정말 잘 돌아가거나 천번에 한번 안 돌아가거나 등 코드가 복잡해지다 보니 나도 모르게 실수하기도 한다 (deadlock 등)
- 하다가 그냥 멈춘다 : 내가 benchmark 1시간 돌리다가
- 죽지도 않고 그대로 멈춰 있어서 debug하기가 너무 힘들었다.
- 마구 달려들기 보단 thread safe-unsafe
- Race가 발생할 수 있으니까 조심해야 하고
- deadlock은 항상 회피해야 하는데 발생할 수 있으므로 차량용 급발진 등 example이 나온다.
- 요즘 자동차들도 sw로 구현 :sw bug가 언제든지 나타날수 있다
- critical software들은 정말 조심해서 test도 많이 하고 bug fix도 잘 해야한다.
- project 2 : Thread pooling을 해 보았다는 것 자체로도 자부심을 가져도 괜찮다
- 그 개념을 알고있다는 것 자체로도 cs core에 도달해 있다. 어렵긴 하다.