본문 바로가기

코딩/분산시스템 프로젝트

5) Primary-backup 구현하기

- 문제 설명
본 과제에서는 간단한 Primary-backup 프로토콜을 구현하여 복제 프로토콜에 대한 이해를 높여봅시다.
- 요구 사항
주어진 client.c, server.c 를 바탕으로 Primary-backup 프로토콜을 수행하는 클라이언트, 서버 프로그램을 완성하 세요.

 

- util.c

#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <time.h>

// Constants
#define KEY_SIZE 16 // 사용할 KEY 크기이다. 16바이트.
#define VALUE_SIZE 16 // 사용할 VALUE 크기이다. 32바이트.
#define DATASET_SIZE 1000 // 데이터셋 크기
#define SET_SIZE 62     // 가능한 문자들의 수 (예: 영문 대소문자 + 숫자)
const char SET[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

// 가독성을 위해 메시지 타입별로 매크로를 만듬
#define READ_REQ 0
#define READ_REP 1
#define WRITE_REQ 2
#define WRITE_REP 3
#define READ 0
#define WRITE 1

struct KVS { // key value store 구조체
    uint8_t type;  // message type
    char key[KEY_SIZE]; // key
    char value[VALUE_SIZE]; // value
} __attribute__((packed));

char* get_type(struct KVS RecvMsg){
  char* type;
  if(RecvMsg.type == READ_REQ) type ="READ_REQ";
  else if(RecvMsg.type == READ_REP) type ="READ_REP";
  else if(RecvMsg.type == WRITE_REQ) type ="WRITE_REQ";
  else type ="WRITE_REP";
  return type;
}

uint64_t hash64(const char* str) {
    uint64_t value = 0;
    while (*str) {
        value = (value * 31 + (uint8_t)*str++) & 0xFFFFFFFFFFFFFFFF;
    }


    value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
    value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
    value = (value >> 32) ^ value;

    return value & 0xFFFFFFFFFFFFFFFF;
}

 

 

- client.c

#include "util.h"

const char* dst_ip = "127.0.0.1";
#define NUM_SRV 3

// 임의의 key를 생성해서 반환해줌
void generate_key(char* key) {
    uint64_t number = rand() % DATASET_SIZE;
    for (int i = 0; i < 5; ++i) number = ((number << 3) - number + 7) & 0xFFFFFFFFFFFFFFFF;
    key[KEY_SIZE - 1] = '\0';
    for (int i = KEY_SIZE - 2; i >= 0; i--) {
        int index = number % SET_SIZE;
        key[i] = SET[index];
        number /= SET_SIZE;
    }
}

int main(int argc, char *argv[]) {

  srand((unsigned int)time(NULL)+ getpid());  // 난수 발생기 초기화
  /* 서버 구조체 설정 */
	int SERVER_PORT = 5001;
	struct sockaddr_in srv_addr; // 패킷을 수신할 서버의 정보를 담을 소켓 구조체를 생성한다.
	memset(&srv_addr, 0, sizeof(srv_addr)); // 구조체를 모두 '0'으로 초기화해준다.
	srv_addr.sin_family = AF_INET; // IPv4를 사용할 것이므로 AF_INET으로 family를 지정한다.
	srv_addr.sin_port = htons(SERVER_PORT); // 서버의 포트번호를 넣어준다. 이 때 htons()를 통해 byte order를 network order로 변환한다.
	inet_pton(AF_INET, dst_ip, &srv_addr.sin_addr);  // 문자열인 IP주소를 바이너리로 변환한 후 소켓 구조체에 저장해준다.

  /* 소켓 생성 */
	int sock; // 소켓 디스크립터(socket descriptor)를 생성한다.
	if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { // socket()으로 IPv4(AF_INET), UDP(SOC_DGRAM)를 사용하는 소켓을 생성 시도한다.
		printf("Could not create socket\n"); // sock으로 return되는 값이 -1이라면 소켓 생성에 실패한 것이다.
		exit(1);
	}

	int n = 0;

  struct KVS SendMsg={0,}; // 송신용으로 쓸 메시지 구조체 생성 및 초기화
  struct KVS RecvMsg={0, }; // 수신용으로 쓸 메세지 구조체 생성 및 초기화

  struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
  socklen_t src_addr_len = sizeof(src_addr); // 수신한 패킷의 소켓 구조체 크기를 저장함. IPv4를 사용하므로 sockaddr_in 크기인 16바이트가 저장됨.
  int cnt = 0; // 패킷 5개를 전송한다.
  int pkt_size = 0;
	while(cnt < 5){
    printf("Request ID: %d\n",cnt++);

    generate_key(SendMsg.key); // 키 값 생성

    if (rand() % 2) { // 읽기 요청과 쓰기 요청을 50% 확률로 나눌 변수
      // 읽기 요청 
      SendMsg.type = READ_REQ;
      strcpy(SendMsg.value, " ");
      pkt_size = KEY_SIZE; // 패킷 사이즈 설정
    }
    else { 
      // 쓰기 요청
      SendMsg.type = WRITE_REQ;
      pkt_size = KEY_SIZE + VALUE_SIZE;
      strcpy(SendMsg.value, "AAAABBBBCCCCDDD"); // value 값 설정
    }

    // 콘솔 출력
    printf("Sent bytes: %d\n", pkt_size);
    printf("Type: %s Key: %s Key: %s\n", get_type(SendMsg), SendMsg.key, SendMsg.value);

    // sendto & recvfrom
    sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr *)&srv_addr, sizeof(srv_addr));
    n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr *)&src_addr, &src_addr_len);

    if (n > 0) { // 메세지가 있다면
      if (RecvMsg.type == READ_REP) pkt_size = KEY_SIZE + VALUE_SIZE;
      else if (RecvMsg.type == WRITE_REQ || RecvMsg.type == WRITE_REP) {
        pkt_size = KEY_SIZE;
        RecvMsg.type = WRITE_REP;
      }
      printf("Received bytes: %d\n", pkt_size);
      printf("Type: %s Key: %s Value: %s\n", get_type(RecvMsg), RecvMsg.key, RecvMsg.value);
    }
	}

	close(sock); // 소켓을 닫아준다.
	return 0;
}

 

 

- server.c

#include "util.h"

int SERVER_PORT; // 서버 포트번호
char* kv[DATASET_SIZE]; // 정적 key value stores
void init_kvs(){
  for(int i =0;i<DATASET_SIZE;i++){
		kv[i] = malloc(VALUE_SIZE);
		strcpy(kv[i], "DDDCCCCBBBBAAAA");
		//printf("%s\n",kv[i]);
	}

}

static volatile int quit = 0; // Trigger conditions for SIGINT
void signal_handler(int signum) {
	if(signum == SIGINT){  // Functions for Ctrl+C (SIGINT)
		quit = 1;
	}
}

int main(int argc, char *argv[]) {
	// 프로그램 시작시 입력받은 매개변수를 parsing한다.
	if ( argc < 2 ){
	 printf("Input : %s port number\n", argv[0]);
	 return 1;
	}

	signal(SIGINT, signal_handler); // SIGINT에 대한 핸들러 등록

	SERVER_PORT = atoi(argv[1]); // 입력받은 argument를 포트번호 변수에 넣어준다.

	// 서버의 정보를 담을 소켓 구조체 생성 및 초기화
	struct sockaddr_in srv_addr;
	memset(&srv_addr, 0, sizeof(srv_addr));
	srv_addr.sin_family = AF_INET;
	srv_addr.sin_port = htons(SERVER_PORT);
	srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 i.e., 자기 자신의 IP

	// 소켓을 생성한다.
	int sock;
	if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
		printf("Could not create listen socket\n");
		exit(1);
	}

	// 생성한 소켓에 소켓구조체를 bind시킨다.
	if ((bind(sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr))) < 0) {
		printf("Could not bind socket\n");
		exit(1);
	}

	init_kvs(); // key-value store 초기화

	int n = 0;
  	struct KVS RecvMsg={0,}; // 수신용으로 쓸 메시지 구조체 생성 및 초기화
	struct KVS SendMsg={0,}; // 수신용으로 쓸 메세지 구조체
	struct KVS AlarmMsg={0,}; // 알람용으로 쓸 메세지 구조체

	struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
  	socklen_t src_addr_len = sizeof(src_addr);
	socklen_t srv_addr_len = sizeof(src_addr);
	int pkt_size = 0;
	while (!quit) {
		// 메세지 받음
		n = recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len);
		printf("Type: %s Key: %s Value: %s\n", get_type(RecvMsg), RecvMsg.key, RecvMsg.value);
		
		// 해쉬 함수로 인덱스 값 얻음
		int index = hash64(RecvMsg.key) % DATASET_SIZE; 
		
		if (n > 0) { // 받은 데이터가 있는 경우
			SendMsg = RecvMsg;
			if (RecvMsg.type == READ_REQ) { // 읽기 요청인 경우
				strcpy(SendMsg.value, kv[index]); // 데이터를 KV에서 꺼낸다
				SendMsg.type = READ_REP;
			}
			else if (RecvMsg.type == WRITE_REQ) { // 쓰기 요청인 경우
				strcpy(kv[index], SendMsg.value); // 데이터를 KV에 넣는다
				if (srv_addr.sin_port == htons(5001)) { // Primary 서버인 경우
					for (int i = 1; i < 3; i++) { 
						srv_addr.sin_port = htons(SERVER_PORT + i); // 백업 서버 포트번호 설정
						// sendto & recvfrom
						sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
						n = recvfrom(sock, &AlarmMsg, sizeof(AlarmMsg), 0, (struct sockaddr*)&srv_addr, &srv_addr_len);
						if (n > 0) {
							//printf("Write is Success\n"); // 서버 올바르게 도착했는지 체크
						}
					}
					srv_addr.sin_port = htons(SERVER_PORT); // 서버 포트번호 초기화
					strcpy(SendMsg.value, "");
					printf("Write is Done\n");
				}
				else if (srv_addr.sin_port == htons(5002) || srv_addr.sin_port == htons(5003)) { // Backup 서버인 경우
					printf("Write is Done\n");
					RecvMsg.type = WRITE_REP;
				}
			}
		}
		sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr*)&src_addr, sizeof(src_addr));
	}

	printf("\nCtrl+C pressed. Exit the program after closing the socket\n");
	close(sock);

	return 0;
}