- 과제명
Distributed key-value stores 구현하기
- util.h
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <time.h>
// Constants
#define KEY_SIZE 16 // 사용할 KEY 크기이다. 16바이트.
#define VALUE_SIZE 16 // 사용할 VALUE 크기이다. 32바이트.
#define DATASET_SIZE 1000 // 데이터셋 크기
#define SET_SIZE 62 // 가능한 문자들의 수 (예: 영문 대소문자 + 숫자)
const char SET[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
// 가독성을 위해 메시지 타입별로 매크로를 만듬
#define READ_REQ 0
#define READ_REP 1
#define WRITE_REQ 2
#define WRITE_REP 3
#define READ 0
#define WRITE 1
struct KVS { // key value store 구조체
uint8_t type; // message type
char key[KEY_SIZE]; // key
char value[VALUE_SIZE]; // value
} __attribute__((packed));
char* get_type(struct KVS RecvMsg){
char* type;
if(RecvMsg.type == READ_REQ) type ="READ_REQ";
else if(RecvMsg.type == READ_REP) type ="READ_REP";
else if(RecvMsg.type == WRITE_REQ) type ="WRITE_REQ";
else type ="WRITE_REP";
return type;
}
uint64_t hash64(const char* str) {
uint64_t value = 0;
while (*str) {
value = (value * 31 + (uint8_t)*str++) & 0xFFFFFFFFFFFFFFFF;
}
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (value >> 32) ^ value;
return value & 0xFFFFFFFFFFFFFFFF;
}
- client.c
#include "util.h"
const char* dst_ip = "127.0.0.1"; // 하나의 host안에서 통신할 것이므로 서버주소는 localhost(i.e., 127.0.0.1)임
#define NUM_SRV 2
// 임의의 key를 생성해서 반환해줌
void generate_key(char* key) {
uint64_t number = rand() % DATASET_SIZE;
for (int i = 0; i < 5; ++i) number = ((number << 3) - number + 7) & 0xFFFFFFFFFFFFFFFF;
key[KEY_SIZE - 1] = '\0';
for (int i = KEY_SIZE - 2; i >= 0; i--) {
int index = number % SET_SIZE;
key[i] = SET[index];
number /= SET_SIZE;
}
}
int main(int argc, char *argv[]) {
srand((unsigned int)time(NULL)); // 난수 발생기 초기화
/* 서버 구조체 설정 */
int SERVER_PORT = 5001; // 입력받은 argument를 포트번호 변수에 넣어준다.
struct sockaddr_in srv_addr; // 패킷을 수신할 서버의 정보를 담을 소켓 구조체를 생성한다.
memset(&srv_addr, 0, sizeof(srv_addr)); // 구조체를 모두 '0'으로 초기화해준다.
srv_addr.sin_family = AF_INET; // IPv4를 사용할 것이므로 AF_INET으로 family를 지정한다.
srv_addr.sin_port = htons(SERVER_PORT); // 서버의 포트번호를 넣어준다. 이 때 htons()를 통해 byte order를 network order로 변환한다.
inet_pton(AF_INET, dst_ip, &srv_addr.sin_addr); // 문자열인 IP주소를 바이너리로 변환한 후 소켓 구조체에 저장해준다.
/* 소켓 생성 */
int sock; // 소켓 디스크립터(socket descriptor)를 생성한다.
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { // socket()으로 IPv4(AF_INET), UDP(SOC_DGRAM)를 사용하는 소켓을 생성 시도한다.
printf("Could not create socket\n"); // sock으로 return되는 값이 -1이라면 소켓 생성에 실패한 것이다.
exit(1);
}
int n = 0;
struct KVS SendMsg={0,}; // 송신용으로 쓸 메시지 구조체 생성 및 초기화
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr); // 수신한 패킷의 소켓 구조체 크기를 저장함. IPv4를 사용하므로 sockaddr_in 크기인 16바이트가 저장됨.
int cnt = 0; // 패킷 5개를 전송한다.
size_t pkt_size = 0;
while(cnt < 5){
printf("Request ID: %d\n",cnt++);
// Your codes should be here
generate_key(SendMsg.key); //키 생성
int partition_num = hash64(SendMsg.key) % 2; //키를 해쉬함수 적용과 함께 서버수로 나머지 얻어서 파티션 번호 생성
/* 파티션 번호에 따라 포트 변경 */
if (partition_num == 0) srv_addr.sin_port = htons(SERVER_PORT); //파티션 번호가 0이라면 포트번호 SERVER_PORT 적용
else srv_addr.sin_port = htons(SERVER_PORT + 1); //파티션 번호가 1이라면 포트번호 SERVER_PORT + 1 적용
/* 랜덤으로 REQ 적용*/
int size;
if (rand() % 2 == 0) {
SendMsg.type = READ_REQ;
strcpy(SendMsg.value, " ");
// 문자열 없음으로 취급되지 않음 => 아예 문장을 없앴는데도 sizeof(SendMsg)가 33으로 나옴
// READ의 경우 17로 나오는 방법 찾아야 함 -> 일단 if문에 size를 넣어놓는 것으로 처리
size = 17;
}
else {
SendMsg.type = WRITE_REQ;
strcpy(SendMsg.value, "AAAABBBBCCCCDDD");
size = 33;
}
/* 출력 */
printf("Partitions: %d\n", partition_num);
printf("Sent bytes: %d\n", size);
char* type;
if (SendMsg.type == READ_REQ) type = "READ_REQ";
else if (SendMsg.type == READ_REP) type = "READ_REP";
else if (SendMsg.type == WRITE_REQ) type = "WRITE_REQ";
else type = "WRITE_REP";
printf("Type: %s key: %s Value: %s\n", type, SendMsg.key, SendMsg.value);
/* 전송 및 수신 */
sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&srv_addr, sizeof(srv_addr)); //srv_addr
n = recvfrom(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&src_addr, &src_addr_len); //src_addr
/* 출력 */
if (n > 0) {
if (SendMsg.type == READ_REQ) type = "READ_REQ";
else if (SendMsg.type == READ_REP) {
type = "READ_REP";
size = 33;
}
else if (SendMsg.type == WRITE_REQ) type = "WRITE_REQ";
else {
type = "WRITE_REP";
size = 17;
}
printf("Received bytes: %d\n", size);
printf("Type: %s key: %s Value: %s\n", type, SendMsg.key, SendMsg.value);
}
}
close(sock); // 소켓을 닫아준다.
return 0;
}
- server.c
#include "util.h"
int SERVER_PORT; // 서버 포트번호
char* kv[DATASET_SIZE]; // 정적 key value stores
void init_kvs(){
for(int i =0;i<DATASET_SIZE;i++){
kv[i] = malloc(VALUE_SIZE);
strcpy(kv[i], "DDDCCCCBBBBAAAA");
//printf("%s\n",kv[i]);
}
}
static volatile int quit = 0; // Trigger conditions for SIGINT
void signal_handler(int signum) {
if(signum == SIGINT){ // Functions for Ctrl+C (SIGINT)
quit = 1;
}
}
int main(int argc, char *argv[]) {
// 프로그램 시작시 입력받은 매개변수를 parsing한다.
if ( argc < 2 ){
printf("Input : %s port number\n", argv[0]);
return 1;
}
signal(SIGINT, signal_handler); // SIGINT에 대한 핸들러 등록
SERVER_PORT = atoi(argv[1]); // 입력받은 argument를 포트번호 변수에 넣어준다.
// 서버의 정보를 담을 소켓 구조체 생성 및 초기화
struct sockaddr_in srv_addr;
memset(&srv_addr, 0, sizeof(srv_addr));
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SERVER_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 i.e., 자기 자신의 IP
// 소켓을 생성한다.
int sock;
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
printf("Could not create listen socket\n");
exit(1);
}
// 생성한 소켓에 소켓구조체를 bind시킨다.
if ((bind(sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr))) < 0) {
printf("Could not bind socket\n");
exit(1);
}
init_kvs(); // key-value store 초기화
int n = 0;
struct KVS RecvMsg; // 수신용으로 쓸 메시지 구조체 생성
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr);
size_t pkt_size = 0;
while (!quit) {
// Your codes should be here
n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len); // 클라이언트에서 받음
/* 출력 */
char* type;
if (RecvMsg.type == READ_REQ) type = "READ_REQ";
else if (RecvMsg.type == READ_REP) type = "READ_REP";
else if (RecvMsg.type == WRITE_REQ) type = "WRITE_REQ";
else type = "WRITE_REP";
printf("Type: %s Key: %s Value: %s\n", type, RecvMsg.key, RecvMsg.value);
if (n > 0) {
int index = hash64(RecvMsg.key) % DATASET_SIZE; // 키값에 해쉬 함수 적용, DATASET_SIZE로 나눠 나머지로 index 받음
/* 읽기 요청이라면 */
if (RecvMsg.type == READ_REQ) {
RecvMsg.type = READ_REP; // type 변경
strcpy(RecvMsg.value, kv[index]); // value 읽기
}
/* 쓰기 요청이라면 */
else if (RecvMsg.type == WRITE_REQ) {
RecvMsg.type = WRITE_REP; // type 변경
strcpy(kv[index], RecvMsg.value); // value 쓰기
strcpy(RecvMsg.value, " "); // 공백으로 만듦
}
}
sendto(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, sizeof(src_addr)); // 클라이언트로 전송
}
printf("\nCtrl+C pressed. Exit the program after closing the socket\n");
close(sock);
return 0;
}
'코딩 > 분산시스템 프로젝트' 카테고리의 다른 글
5) Primary-backup 구현하기 (0) | 2024.06.01 |
---|---|
4) Power of two choices 로드 밸런싱 구현하기 (0) | 2024.05.24 |
3) Look-aside cache 구현하기 (0) | 2024.05.17 |
1) UDP Echo 프로그램 만들기 (0) | 2024.05.17 |