본문 바로가기

코딩/분산시스템 프로젝트

2) Distributed key-value stores 구현하기

- 과제명
Distributed key-value stores 구현하기

 


 

- util.h

#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <time.h>

// Constants
#define KEY_SIZE 16 // 사용할 KEY 크기이다. 16바이트.
#define VALUE_SIZE 16 // 사용할 VALUE 크기이다. 32바이트.
#define DATASET_SIZE 1000 // 데이터셋 크기
#define SET_SIZE 62     // 가능한 문자들의 수 (예: 영문 대소문자 + 숫자)
const char SET[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

// 가독성을 위해 메시지 타입별로 매크로를 만듬
#define READ_REQ 0
#define READ_REP 1
#define WRITE_REQ 2
#define WRITE_REP 3
#define READ 0
#define WRITE 1

struct KVS { // key value store 구조체
    uint8_t type;  // message type
    char key[KEY_SIZE]; // key
    char value[VALUE_SIZE]; // value
} __attribute__((packed));

char* get_type(struct KVS RecvMsg){
  char* type;
  if(RecvMsg.type == READ_REQ) type ="READ_REQ";
  else if(RecvMsg.type == READ_REP) type ="READ_REP";
  else if(RecvMsg.type == WRITE_REQ) type ="WRITE_REQ";
  else type ="WRITE_REP";
  return type;
}

uint64_t hash64(const char* str) {
    uint64_t value = 0;
    while (*str) {
        value = (value * 31 + (uint8_t)*str++) & 0xFFFFFFFFFFFFFFFF;
    }


    value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
    value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
    value = (value >> 32) ^ value;

    return value & 0xFFFFFFFFFFFFFFFF;
}

 

 

- client.c

#include "util.h"

const char* dst_ip = "127.0.0.1"; // 하나의 host안에서 통신할 것이므로 서버주소는 localhost(i.e., 127.0.0.1)임
#define NUM_SRV 2

// 임의의 key를 생성해서 반환해줌
void generate_key(char* key) {
    uint64_t number = rand() % DATASET_SIZE;
    for (int i = 0; i < 5; ++i) number = ((number << 3) - number + 7) & 0xFFFFFFFFFFFFFFFF;
    key[KEY_SIZE - 1] = '\0';
    for (int i = KEY_SIZE - 2; i >= 0; i--) {
        int index = number % SET_SIZE;
        key[i] = SET[index];
        number /= SET_SIZE;
    }
}

int main(int argc, char *argv[]) {

  srand((unsigned int)time(NULL));  // 난수 발생기 초기화
  /* 서버 구조체 설정 */
	int SERVER_PORT = 5001; // 입력받은 argument를 포트번호 변수에 넣어준다.
	struct sockaddr_in srv_addr; // 패킷을 수신할 서버의 정보를 담을 소켓 구조체를 생성한다.
	memset(&srv_addr, 0, sizeof(srv_addr)); // 구조체를 모두 '0'으로 초기화해준다.
	srv_addr.sin_family = AF_INET; // IPv4를 사용할 것이므로 AF_INET으로 family를 지정한다.
	srv_addr.sin_port = htons(SERVER_PORT); // 서버의 포트번호를 넣어준다. 이 때 htons()를 통해 byte order를 network order로 변환한다.
	inet_pton(AF_INET, dst_ip, &srv_addr.sin_addr);  // 문자열인 IP주소를 바이너리로 변환한 후 소켓 구조체에 저장해준다.

  /* 소켓 생성 */
	int sock; // 소켓 디스크립터(socket descriptor)를 생성한다.
	if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { // socket()으로 IPv4(AF_INET), UDP(SOC_DGRAM)를 사용하는 소켓을 생성 시도한다.
		printf("Could not create socket\n"); // sock으로 return되는 값이 -1이라면 소켓 생성에 실패한 것이다.
		exit(1);
	}

	int n = 0;

  struct KVS SendMsg={0,}; // 송신용으로 쓸 메시지 구조체 생성 및 초기화

  struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
  socklen_t src_addr_len = sizeof(src_addr); // 수신한 패킷의 소켓 구조체 크기를 저장함. IPv4를 사용하므로 sockaddr_in 크기인 16바이트가 저장됨.
  int cnt = 0; // 패킷 5개를 전송한다.
  size_t pkt_size = 0;
	while(cnt < 5){
    printf("Request ID: %d\n",cnt++);
    // Your codes should be here
	generate_key(SendMsg.key); //키 생성
	int partition_num = hash64(SendMsg.key) % 2; //키를 해쉬함수 적용과 함께 서버수로 나머지 얻어서 파티션 번호 생성

	/* 파티션 번호에 따라 포트 변경 */
	if (partition_num == 0) srv_addr.sin_port = htons(SERVER_PORT); //파티션 번호가 0이라면 포트번호 SERVER_PORT 적용
	else srv_addr.sin_port = htons(SERVER_PORT + 1); //파티션 번호가 1이라면 포트번호 SERVER_PORT + 1 적용

	/* 랜덤으로 REQ 적용*/
	int size;
	if (rand() % 2 == 0) {
		SendMsg.type = READ_REQ;
		strcpy(SendMsg.value, " ");
		// 문자열 없음으로 취급되지 않음 => 아예 문장을 없앴는데도 sizeof(SendMsg)가 33으로 나옴
		// READ의 경우 17로 나오는 방법 찾아야 함 -> 일단 if문에 size를 넣어놓는 것으로 처리
		size = 17;
	}
	else {
		SendMsg.type = WRITE_REQ;
		strcpy(SendMsg.value, "AAAABBBBCCCCDDD");
		size = 33;
	}

	/* 출력 */
	printf("Partitions: %d\n", partition_num);
	printf("Sent bytes: %d\n", size); 

	char* type;
	if (SendMsg.type == READ_REQ) type = "READ_REQ";
	else if (SendMsg.type == READ_REP) type = "READ_REP";
	else if (SendMsg.type == WRITE_REQ) type = "WRITE_REQ";
	else type = "WRITE_REP";
	printf("Type: %s key: %s Value: %s\n", type, SendMsg.key, SendMsg.value);

	/* 전송 및 수신 */
	sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&srv_addr, sizeof(srv_addr)); //srv_addr
	n = recvfrom(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&src_addr, &src_addr_len); //src_addr

	/* 출력 */
	if (n > 0) {
		if (SendMsg.type == READ_REQ) type = "READ_REQ";
		else if (SendMsg.type == READ_REP) {
			type = "READ_REP";
			size = 33;
		}
		else if (SendMsg.type == WRITE_REQ) type = "WRITE_REQ";
		else {
			type = "WRITE_REP";
			size = 17;
		}
		printf("Received bytes: %d\n", size);
		printf("Type: %s key: %s Value: %s\n", type, SendMsg.key, SendMsg.value);
	}
	}

	close(sock); // 소켓을 닫아준다.
	return 0;
}

 

 

- server.c

#include "util.h"

int SERVER_PORT; // 서버 포트번호
char* kv[DATASET_SIZE]; // 정적 key value stores
void init_kvs(){
  for(int i =0;i<DATASET_SIZE;i++){
		kv[i] = malloc(VALUE_SIZE);
		strcpy(kv[i], "DDDCCCCBBBBAAAA");
		//printf("%s\n",kv[i]);
	}

}

static volatile int quit = 0; // Trigger conditions for SIGINT
void signal_handler(int signum) {
	if(signum == SIGINT){  // Functions for Ctrl+C (SIGINT)
		quit = 1;
	}
}

int main(int argc, char *argv[]) {
	// 프로그램 시작시 입력받은 매개변수를 parsing한다.
	if ( argc < 2 ){
	 printf("Input : %s port number\n", argv[0]);
	 return 1;
	}

	signal(SIGINT, signal_handler); // SIGINT에 대한 핸들러 등록

	SERVER_PORT = atoi(argv[1]); // 입력받은 argument를 포트번호 변수에 넣어준다.

	// 서버의 정보를 담을 소켓 구조체 생성 및 초기화
	struct sockaddr_in srv_addr;
	memset(&srv_addr, 0, sizeof(srv_addr));
	srv_addr.sin_family = AF_INET;
	srv_addr.sin_port = htons(SERVER_PORT);
	srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 i.e., 자기 자신의 IP

	// 소켓을 생성한다.
	int sock;
	if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		printf("Could not create listen socket\n");
		exit(1);
	}

	// 생성한 소켓에 소켓구조체를 bind시킨다.
	if ((bind(sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr))) < 0) {
		printf("Could not bind socket\n");
		exit(1);
	}

	init_kvs(); // key-value store 초기화

	int n = 0;
  struct KVS RecvMsg; // 수신용으로 쓸 메시지 구조체 생성

	struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
  socklen_t src_addr_len = sizeof(src_addr);
	size_t pkt_size = 0;
	while (!quit) {
    // Your codes should be here
		n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len); // 클라이언트에서 받음

		/* 출력 */
		char* type;
		if (RecvMsg.type == READ_REQ) type = "READ_REQ";
		else if (RecvMsg.type == READ_REP) type = "READ_REP";
		else if (RecvMsg.type == WRITE_REQ) type = "WRITE_REQ";
		else type = "WRITE_REP";
		printf("Type: %s Key: %s Value: %s\n", type, RecvMsg.key, RecvMsg.value);

		if (n > 0) {
			int index = hash64(RecvMsg.key) % DATASET_SIZE; // 키값에 해쉬 함수 적용, DATASET_SIZE로 나눠 나머지로 index 받음

			/* 읽기 요청이라면 */
			if (RecvMsg.type == READ_REQ) { 
				RecvMsg.type = READ_REP; // type 변경
				strcpy(RecvMsg.value, kv[index]); // value 읽기
			}

			/* 쓰기 요청이라면 */
			else if (RecvMsg.type == WRITE_REQ) {
				RecvMsg.type = WRITE_REP; // type 변경
				strcpy(kv[index], RecvMsg.value); // value 쓰기
				strcpy(RecvMsg.value, " "); // 공백으로 만듦
			}
		}

		sendto(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, sizeof(src_addr)); // 클라이언트로 전송
	}

	printf("\nCtrl+C pressed. Exit the program after closing the socket\n");
	close(sock);

	return 0;
}