- 문제 설명
본 과제에서는 간단한 Primary-backup 프로토콜을 구현하여 복제 프로토콜에 대한 이해를 높여봅시다.
- 요구 사항
주어진 client.c, server.c 를 바탕으로 Primary-backup 프로토콜을 수행하는 클라이언트, 서버 프로그램을 완성하 세요.
- util.c
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <time.h>
// Constants
#define KEY_SIZE 16 // 사용할 KEY 크기이다. 16바이트.
#define VALUE_SIZE 16 // 사용할 VALUE 크기이다. 32바이트.
#define DATASET_SIZE 1000 // 데이터셋 크기
#define SET_SIZE 62 // 가능한 문자들의 수 (예: 영문 대소문자 + 숫자)
const char SET[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
// 가독성을 위해 메시지 타입별로 매크로를 만듬
#define READ_REQ 0
#define READ_REP 1
#define WRITE_REQ 2
#define WRITE_REP 3
#define READ 0
#define WRITE 1
struct KVS { // key value store 구조체
uint8_t type; // message type
char key[KEY_SIZE]; // key
char value[VALUE_SIZE]; // value
} __attribute__((packed));
char* get_type(struct KVS RecvMsg){
char* type;
if(RecvMsg.type == READ_REQ) type ="READ_REQ";
else if(RecvMsg.type == READ_REP) type ="READ_REP";
else if(RecvMsg.type == WRITE_REQ) type ="WRITE_REQ";
else type ="WRITE_REP";
return type;
}
uint64_t hash64(const char* str) {
uint64_t value = 0;
while (*str) {
value = (value * 31 + (uint8_t)*str++) & 0xFFFFFFFFFFFFFFFF;
}
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (value >> 32) ^ value;
return value & 0xFFFFFFFFFFFFFFFF;
}
- client.c
#include "util.h"
const char* dst_ip = "127.0.0.1";
#define NUM_SRV 3
// 임의의 key를 생성해서 반환해줌
void generate_key(char* key) {
uint64_t number = rand() % DATASET_SIZE;
for (int i = 0; i < 5; ++i) number = ((number << 3) - number + 7) & 0xFFFFFFFFFFFFFFFF;
key[KEY_SIZE - 1] = '\0';
for (int i = KEY_SIZE - 2; i >= 0; i--) {
int index = number % SET_SIZE;
key[i] = SET[index];
number /= SET_SIZE;
}
}
int main(int argc, char *argv[]) {
srand((unsigned int)time(NULL)+ getpid()); // 난수 발생기 초기화
/* 서버 구조체 설정 */
int SERVER_PORT = 5001;
struct sockaddr_in srv_addr; // 패킷을 수신할 서버의 정보를 담을 소켓 구조체를 생성한다.
memset(&srv_addr, 0, sizeof(srv_addr)); // 구조체를 모두 '0'으로 초기화해준다.
srv_addr.sin_family = AF_INET; // IPv4를 사용할 것이므로 AF_INET으로 family를 지정한다.
srv_addr.sin_port = htons(SERVER_PORT); // 서버의 포트번호를 넣어준다. 이 때 htons()를 통해 byte order를 network order로 변환한다.
inet_pton(AF_INET, dst_ip, &srv_addr.sin_addr); // 문자열인 IP주소를 바이너리로 변환한 후 소켓 구조체에 저장해준다.
/* 소켓 생성 */
int sock; // 소켓 디스크립터(socket descriptor)를 생성한다.
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { // socket()으로 IPv4(AF_INET), UDP(SOC_DGRAM)를 사용하는 소켓을 생성 시도한다.
printf("Could not create socket\n"); // sock으로 return되는 값이 -1이라면 소켓 생성에 실패한 것이다.
exit(1);
}
int n = 0;
struct KVS SendMsg={0,}; // 송신용으로 쓸 메시지 구조체 생성 및 초기화
struct KVS RecvMsg={0, }; // 수신용으로 쓸 메세지 구조체 생성 및 초기화
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr); // 수신한 패킷의 소켓 구조체 크기를 저장함. IPv4를 사용하므로 sockaddr_in 크기인 16바이트가 저장됨.
int cnt = 0; // 패킷 5개를 전송한다.
int pkt_size = 0;
while(cnt < 5){
printf("Request ID: %d\n",cnt++);
generate_key(SendMsg.key); // 키 값 생성
if (rand() % 2) { // 읽기 요청과 쓰기 요청을 50% 확률로 나눌 변수
// 읽기 요청
SendMsg.type = READ_REQ;
strcpy(SendMsg.value, " ");
pkt_size = KEY_SIZE; // 패킷 사이즈 설정
}
else {
// 쓰기 요청
SendMsg.type = WRITE_REQ;
pkt_size = KEY_SIZE + VALUE_SIZE;
strcpy(SendMsg.value, "AAAABBBBCCCCDDD"); // value 값 설정
}
// 콘솔 출력
printf("Sent bytes: %d\n", pkt_size);
printf("Type: %s Key: %s Key: %s\n", get_type(SendMsg), SendMsg.key, SendMsg.value);
// sendto & recvfrom
sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&srv_addr, sizeof(srv_addr));
n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr *)&src_addr, &src_addr_len);
if (n > 0) { // 메세지가 있다면
if (RecvMsg.type == READ_REP) pkt_size = KEY_SIZE + VALUE_SIZE;
else if (RecvMsg.type == WRITE_REQ || RecvMsg.type == WRITE_REP) {
pkt_size = KEY_SIZE;
RecvMsg.type = WRITE_REP;
}
printf("Received bytes: %d\n", pkt_size);
printf("Type: %s Key: %s Value: %s\n", get_type(RecvMsg), RecvMsg.key, RecvMsg.value);
}
}
close(sock); // 소켓을 닫아준다.
return 0;
}
- server.c
#include "util.h"
int SERVER_PORT; // 서버 포트번호
char* kv[DATASET_SIZE]; // 정적 key value stores
void init_kvs(){
for(int i =0;i<DATASET_SIZE;i++){
kv[i] = malloc(VALUE_SIZE);
strcpy(kv[i], "DDDCCCCBBBBAAAA");
//printf("%s\n",kv[i]);
}
}
static volatile int quit = 0; // Trigger conditions for SIGINT
void signal_handler(int signum) {
if(signum == SIGINT){ // Functions for Ctrl+C (SIGINT)
quit = 1;
}
}
int main(int argc, char *argv[]) {
// 프로그램 시작시 입력받은 매개변수를 parsing한다.
if ( argc < 2 ){
printf("Input : %s port number\n", argv[0]);
return 1;
}
signal(SIGINT, signal_handler); // SIGINT에 대한 핸들러 등록
SERVER_PORT = atoi(argv[1]); // 입력받은 argument를 포트번호 변수에 넣어준다.
// 서버의 정보를 담을 소켓 구조체 생성 및 초기화
struct sockaddr_in srv_addr;
memset(&srv_addr, 0, sizeof(srv_addr));
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SERVER_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 i.e., 자기 자신의 IP
// 소켓을 생성한다.
int sock;
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
printf("Could not create listen socket\n");
exit(1);
}
// 생성한 소켓에 소켓구조체를 bind시킨다.
if ((bind(sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr))) < 0) {
printf("Could not bind socket\n");
exit(1);
}
init_kvs(); // key-value store 초기화
int n = 0;
struct KVS RecvMsg={0,}; // 수신용으로 쓸 메시지 구조체 생성 및 초기화
struct KVS SendMsg={0,}; // 수신용으로 쓸 메세지 구조체
struct KVS AlarmMsg={0,}; // 알람용으로 쓸 메세지 구조체
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr);
socklen_t srv_addr_len = sizeof(src_addr);
int pkt_size = 0;
while (!quit) {
// 메세지 받음
n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len);
printf("Type: %s Key: %s Value: %s\n", get_type(RecvMsg), RecvMsg.key, RecvMsg.value);
// 해쉬 함수로 인덱스 값 얻음
int index = hash64(RecvMsg.key) % DATASET_SIZE;
if (n > 0) { // 받은 데이터가 있는 경우
SendMsg = RecvMsg;
if (RecvMsg.type == READ_REQ) { // 읽기 요청인 경우
strcpy(SendMsg.value, kv[index]); // 데이터를 KV에서 꺼낸다
SendMsg.type = READ_REP;
}
else if (RecvMsg.type == WRITE_REQ) { // 쓰기 요청인 경우
strcpy(kv[index], SendMsg.value); // 데이터를 KV에 넣는다
if (srv_addr.sin_port == htons(5001)) { // Primary 서버인 경우
for (int i = 1; i < 3; i++) {
srv_addr.sin_port = htons(SERVER_PORT + i); // 백업 서버 포트번호 설정
// sendto & recvfrom
sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
n = recvfrom(sock, &AlarmMsg, sizeof(AlarmMsg), 0, (struct sockaddr*)&srv_addr, &srv_addr_len);
if (n > 0) {
//printf("Write is Success\n"); // 서버 올바르게 도착했는지 체크
}
}
srv_addr.sin_port = htons(SERVER_PORT); // 서버 포트번호 초기화
strcpy(SendMsg.value, "");
printf("Write is Done\n");
}
else if (srv_addr.sin_port == htons(5002) || srv_addr.sin_port == htons(5003)) { // Backup 서버인 경우
printf("Write is Done\n");
RecvMsg.type = WRITE_REP;
}
}
}
sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr*)&src_addr, sizeof(src_addr));
}
printf("\nCtrl+C pressed. Exit the program after closing the socket\n");
close(sock);
return 0;
}
'코딩 > 분산시스템 프로젝트' 카테고리의 다른 글
4) Power of two choices 로드 밸런싱 구현하기 (0) | 2024.05.24 |
---|---|
3) Look-aside cache 구현하기 (0) | 2024.05.17 |
2) Distributed key-value stores 구현하기 (0) | 2024.05.17 |
1) UDP Echo 프로그램 만들기 (0) | 2024.05.17 |