- 문제 설명
우리는 수업 시간에 다양한 로드 밸런싱 알고리즘을 배웠습니다. 이제 power of two choices라는 새로운 알고리 즘을 직접 구현해서 배움을 심화시켜 봅시다.
Power of two choices는 random 로드밸런싱과 least connection을 혼 합한 것과 유사합니다.
방법은, 임의로 두 개의 서버를 선택을 한 후, 선택된 두 개의 서버 중에서 현재 전송중인 요청의 수 혹은 연결되어 있는 커넥션의 수가 적은 쪽을 선택합니다.
예를 들어서, 서버가 1~4까지 4개가 존재하 고, 어느 한 시점에 각 서버로 전송중인 요청의 수가 40 30 20 10이라고 가정 해봅시다. power of two choices에서 는 서버 1~4 중에 임의로 두 개를 선택합니다. 예를 들어서 서버 2와 4가 선택되었다고 가정합시다. 그럼 이제 서버 2의 전송중인 요청수와 서버 4의 전송중인 요청수를 비교합니다. 서버2는 30이고, 서버4는 10이므로, 최종 적으로 서버4가 목적지 서버가 됩니다.
- 요구사항
주어진 client.c, server.c 를 바탕으로 power of two choices 로드 밸런싱 알고리즘을 사용하는 클라이언트와, 요 청을 처리하는 서버를 완성하세요.
- util.h
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdbool.h>
#include <time.h>
// Constants
#define KEY_SIZE 16 // 사용할 KEY 크기이다. 16바이트.
#define VALUE_SIZE 16 // 사용할 VALUE 크기이다. 32바이트.
#define DATASET_SIZE 1000 // 데이터셋 크기
#define SET_SIZE 62 // 가능한 문자들의 수 (예: 영문 대소문자 + 숫자)
const char SET[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
// 가독성을 위해 메시지 타입별로 매크로를 만듬
#define READ_REQ 0
#define READ_REP 1
#define WRITE_REQ 2
#define WRITE_REP 3
#define READ 0
#define WRITE 1
struct KVS { // key value store 구조체
uint8_t type; // message type
char key[KEY_SIZE]; // key
char value[VALUE_SIZE]; // value
} __attribute__((packed));
char* get_type(struct KVS RecvMsg){
char* type;
if(RecvMsg.type == READ_REQ) type ="READ_REQ";
else if(RecvMsg.type == READ_REP) type ="READ_REP";
else if(RecvMsg.type == WRITE_REQ) type ="WRITE_REQ";
else type ="WRITE_REP";
return type;
}
uint64_t hash64(const char* str) {
uint64_t value = 0;
while (*str) {
value = (value * 31 + (uint8_t)*str++) & 0xFFFFFFFFFFFFFFFF;
}
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (((value >> 32) ^ value) * 0xc4ceb9fe1a85ec53) & 0xFFFFFFFFFFFFFFFF;
value = (value >> 32) ^ value;
return value & 0xFFFFFFFFFFFFFFFF;
}
- client.c
#include "util.h"
const char* dst_ip = "127.0.0.1";
#define NUM_SRV 4
// 임의의 key를 생성해서 반환해줌
void generate_key(char* key) {
uint64_t number = rand() % DATASET_SIZE;
for (int i = 0; i < 5; ++i) number = ((number << 3) - number + 7) & 0xFFFFFFFFFFFFFFFF;
key[KEY_SIZE - 1] = '\0';
for (int i = KEY_SIZE - 2; i >= 0; i--) {
int index = number % SET_SIZE;
key[i] = SET[index];
number /= SET_SIZE;
}
}
int main(int argc, char *argv[]) {
srand((unsigned int)time(NULL)); // 난수 발생기 초기화
/* 서버 구조체 설정 */
int SERVER_PORT = 5001;
struct sockaddr_in srv_addr; // 패킷을 수신할 서버의 정보를 담을 소켓 구조체를 생성한다.
memset(&srv_addr, 0, sizeof(srv_addr)); // 구조체를 모두 '0'으로 초기화해준다.
srv_addr.sin_family = AF_INET; // IPv4를 사용할 것이므로 AF_INET으로 family를 지정한다.
srv_addr.sin_port = htons(SERVER_PORT); // 서버의 포트번호를 넣어준다. 이 때 htons()를 통해 byte order를 network order로 변환한다.
inet_pton(AF_INET, dst_ip, &srv_addr.sin_addr); // 문자열인 IP주소를 바이너리로 변환한 후 소켓 구조체에 저장해준다.
/* 소켓 생성 */
int sock; // 소켓 디스크립터(socket descriptor)를 생성한다.
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { // socket()으로 IPv4(AF_INET), UDP(SOC_DGRAM)를 사용하는 소켓을 생성 시도한다.
printf("Could not create socket\n"); // sock으로 return되는 값이 -1이라면 소켓 생성에 실패한 것이다.
exit(1);
}
int n = 0;
struct KVS SendMsg={0,}; // 송신용으로 쓸 메시지 구조체 생성 및 초기화
struct KVS RecvMsg={0,}; // 수신용으로 쓸 메시지 구조체 생성 및 초기화
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr); // 수신한 패킷의 소켓 구조체 크기를 저장함. IPv4를 사용하므로 sockaddr_in 크기인 16바이트가 저장됨.
int cnt = 0; // 패킷 5개를 전송한다.
size_t pkt_size = 0;
const int SERVER_NUM = 4; // 서버의 수
int serverLoad[SERVER_NUM]; //서버 부하를 누적하는 배열
while(cnt < 5){
printf("Request ID: %d\n",cnt++); // ID 출력
SendMsg.type = READ_REQ; // 메세지 구조체 타입 설정
generate_key(SendMsg.key); // 메세지 구조체 키 설정
srv_addr.sin_port = htons(SERVER_PORT); // 포트 번호를 기본으로 되돌린다.
// 두개의 서버를 랜덤으로 선택하고 비교해, 누적 수가 적은 서버를 선택한다.
int server1 = rand() % SERVER_NUM; // 서버1 랜덤 선택
int server2 = rand() % SERVER_NUM; // 서버2 랜덤 선택
int selectedServer = (serverLoad[server1] < serverLoad[server2]) ? server1: server2; // 누적 수가 작은 서버 선택
serverLoad[selectedServer] += 1; // 선택된 서버의 누적 수 1 증가
srv_addr.sin_port = htons(SERVER_PORT + selectedServer); // 선택된 서버에 맞게 포트 번호 변경
pkt_size = 17; // 패킷 사이즈 설정
printf("Sent bytes: %zu\n", pkt_size); // 송신 패킷 사이즈 출력
sendto(sock, &SendMsg, sizeof(SendMsg), 0, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); // 요청 전송
printf("Type: %s Key: %s Value:\n", get_type(SendMsg), SendMsg.key);
recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len); // 답장 수신
printf("Received bytes: %ld\n", sizeof(RecvMsg));
printf("Type: %s Key: %s Value: %s\n", get_type(RecvMsg), RecvMsg.key, RecvMsg.value);
}
close(sock); // 소켓을 닫아준다.
return 0;
}
- server.c
#include "util.h"
int SERVER_PORT; // 서버 포트번호
char* kv[DATASET_SIZE]; // 정적 key value stores
void init_kvs(){
for(int i =0;i<DATASET_SIZE;i++){
kv[i] = malloc(VALUE_SIZE);
strcpy(kv[i], "DDDCCCCBBBBAAAA");
//printf("%s\n",kv[i]);
}
}
static volatile int quit = 0; // Trigger conditions for SIGINT
void signal_handler(int signum) {
if(signum == SIGINT){ // Functions for Ctrl+C (SIGINT)
quit = 1;
}
}
int main(int argc, char *argv[]) {
// 프로그램 시작시 입력받은 매개변수를 parsing한다.
if ( argc < 2 ){
printf("Input : %s port number\n", argv[0]);
return 1;
}
signal(SIGINT, signal_handler); // SIGINT에 대한 핸들러 등록
SERVER_PORT = atoi(argv[1]); // 입력받은 argument를 포트번호 변수에 넣어준다.
// 서버의 정보를 담을 소켓 구조체 생성 및 초기화
struct sockaddr_in srv_addr;
memset(&srv_addr, 0, sizeof(srv_addr));
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SERVER_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0 i.e., 자기 자신의 IP
// 소켓을 생성한다.
int sock;
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
printf("Could not create listen socket\n");
exit(1);
}
// 생성한 소켓에 소켓구조체를 bind시킨다.
if ((bind(sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr))) < 0) {
printf("Could not bind socket\n");
exit(1);
}
init_kvs(); // key-value store 초기화
int n = 0;
struct KVS RecvMsg={0,}; // 수신용으로 쓸 메시지 구조체 생성 및 초기화
struct sockaddr_in src_addr; // 패킷을 수신하였을 때, 해당 패킷을 보낸 송신자(Source)의 정보를 저장하기 위한 소켓 구조체
socklen_t src_addr_len = sizeof(src_addr);
size_t pkt_size = 0;
while (!quit) {
recvfrom(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, &src_addr_len); // 패킷 수신
printf("Type: %s Key: %s\n", get_type(RecvMsg), RecvMsg.key); // 출력
int index = hash64(RecvMsg.key) % DATASET_SIZE; // 해쉬 함수를 사용해 index 습득
strcpy(RecvMsg.value, kv[index]); // value 참조
sendto(sock, &RecvMsg, sizeof(RecvMsg), 0, (struct sockaddr*)&src_addr, sizeof(src_addr)); // 패킷 송신
}
printf("\nCtrl+C pressed. Exit the program after closing the socket\n");
close(sock);
return 0;
}
'코딩 > 분산시스템 프로젝트' 카테고리의 다른 글
5) Primary-backup 구현하기 (0) | 2024.06.01 |
---|---|
3) Look-aside cache 구현하기 (0) | 2024.05.17 |
2) Distributed key-value stores 구현하기 (0) | 2024.05.17 |
1) UDP Echo 프로그램 만들기 (0) | 2024.05.17 |