ACE_Reactor는 ACE_Event_Handler의 인스턴스를 관리합니다. ACE_Reactor::register_handler 함수로 관심있는 이벤트 타입을 설정합니다. ACE_Reactor::remove_handler 함수로 기존 등록된 이벤트 타입을 제거하도록 요청할 수 있습니다.
handle_input 함수는 새로운 사용자가 접속하거나, 새로운 데이터 받았거나, 사용자가 접속을 끊으면 호출됩니다.
네크웍 서버는 기본적으로 ACE_SOCK_Acceptor로 사용자 접속을 기다립니다. ACE_SOCK_Acceptor::open 함수로 특정 IP와 포트로 바인딩이 가능합니다. 기다리던 ACE_SOCK_Acceptor를 ACE_Reactor에 ACE_Event_Handler::ACCEPT_MASK로 이벤트를 등록하면 handle_input으로 이벤트가 콜백됩니다.
사용자 접속이 이루어지면 그제서야 새로운 ACE_Event_Handler로부터 상속받은 서비스 객체를 생성하고 접속을 허가하고 생성된 서비스 객체 역시 기존 ACE_Reactor 객체에 이벤트 타입을 등록해서 받길 원하는 이벤트를 설정합니다.
handle_input에서 recv로 읽은 데이터의 길이가 0보다 같거나 작으면, return -1를 해서 접속을 끊어주어야 합니다. 0 이하의 값은 사용자가 접속을 끊었거나 ACE_SOCK_Stream 객체에 알수 없는 에러가 발생한 경우입니다.
EchoService.h
#pragma once
#include <ace/Event_Handler.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
#include <ace/INET_Addr.h>
#include <ace/Message_Queue.h>
#include <ace/Synch.h>
class EchoService:public ACE_Event_Handler
{
private:
ACE_SOCK_Stream peer_;
ACE_INET_Addr addr_;
ACE_Message_Queue<ACE_MT_SYNCH> send_datas_;
public:
EchoService(ACE_SOCK_Acceptor& acceptor, ACE_Reactor* reactor);
~EchoService(void);
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
};
EchoService.cpp
#include "EchoService.h"
#include <iostream>
#include <ace/OS.h>
EchoService::EchoService(ACE_SOCK_Acceptor& acceptor, ACE_Reactor* reactor)
:ACE_Event_Handler(reactor),peer_(ACE_INVALID_HANDLE)
{
if( -1 == acceptor.accept(peer_,&addr_) ){
delete this;
return;
}
char addr_string[256];
addr_.addr_to_string(addr_string, 256);
std::cout<<std::endl<<"Enter Client:"<<addr_string<<std::endl;
this->reactor()->register_handler( this,ACE_Event_Handler::READ_MASK);
}
EchoService::~EchoService(void)
{
}
ACE_HANDLE
EchoService::get_handle (void) const{
return this->peer_.get_handle();
}
int
EchoService::handle_input (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
const int BUF =1024;
unsigned char in[BUF];
ACE_Time_Value vt(0,0);
ssize_t len =this->peer_.recv(in, BUF,&vt);
if( len <= 0){//클라이언트 종료. 통신 에러
return -1;
}
ACE_Message_Block* mb = new ACE_Message_Block(len);
ACE_OS::memcpy( mb->wr_ptr(), in, len);
mb->wr_ptr(len);
send_datas_.enqueue_tail( mb);
this->reactor()->register_handler( this, ACE_Event_Handler::WRITE_MASK);
return 0;
}
int
EchoService::handle_output (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
ACE_Message_Block* mb(NULL);
ACE_Time_Value rt(0,0);
do{
this->send_datas_.dequeue_head(mb,&rt);
ssize_t len=this->peer_.send(mb->rd_ptr(), mb->length(),&rt);
if( len <0){//통신 에러
mb->release();
return -1;
}
mb->rd_ptr(len);
if( mb->length()>0){
this->send_datas_.enqueue_head( mb );
break;
}
mb->release();
}while(false);
if( this->send_datas_.is_empty()){
ACE_Reactor_Mask m = ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this, m);
}
return 0;
}
int
EchoService::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask){
char addr_string[256];
addr_.addr_to_string(addr_string, 256);
std::cout<<std::endl<<"Exit Client:"<<addr_string<<std::endl;
ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler(this,m);
this->send_datas_.flush();
this->peer_.close();
delete this;
return 0;
}
GAcceptor.h
#pragma once
#include <ace/Event_Handler.h>
#include <ace/Reactor.h>
#include <ace/SOCK_Acceptor.h>
class GAcceptor:public ACE_Event_Handler
{
private:
ACE_SOCK_Acceptor acceptor_;
public:
GAcceptor(const char* ipstr,ACE_Reactor* reactor);
~GAcceptor(void);
virtual ACE_HANDLE get_handle (void) const;
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
};
GAcceptor.cpp
#include "GAcceptor.h"
#include <ace/INET_Addr.h>
#include <iostream>
#include "EchoService.h"
GAcceptor::GAcceptor(const char* ipstr,ACE_Reactor* reactor)
:ACE_Event_Handler(reactor)
{
ACE_INET_Addr addr(ipstr);
int bret = acceptor_.open(addr,1);
if( -1==bret){
std::cout<<std::endl<<"Listen fail:"<<ipstr<<std::endl;
delete this;
return;
}
std::cout<<std::endl<<"Server start:"<<ipstr<<std::endl;
this->reactor()->register_handler( this, ACE_Event_Handler::ACCEPT_MASK);
}
GAcceptor::~GAcceptor(void)
{
}
ACE_HANDLE
GAcceptor::get_handle (void) const{
return this->acceptor_.get_handle();
}
int
GAcceptor::handle_input (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
new EchoService(this->acceptor_,this->reactor());
return 0;
}
int
GAcceptor::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask){
ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler( this, m);
this->acceptor_.close();
delete this;
return 0;
}
Quit.h
#pragma once
#include <ace/Reactor.h>
#include <ace/Event_Handler.h>
class Quit:public ACE_Event_Handler
{
public:
Quit(ACE_Reactor* reactor);
~Quit(void);
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);
virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
};
Quit.cpp
#include "Quit.h"
#include <ace/OS.h>
Quit::Quit(ACE_Reactor* reactor)
:ACE_Event_Handler(reactor)
{
this->reactor()->register_handler(SIGBREAK, this);
}
Quit::~Quit(void)
{
}
int
Quit::handle_signal (int signum, siginfo_t * /*= 0*/, ucontext_t * /*= 0*/){
this->reactor()->notify(this);
ACE_OS::sleep(2);
return 0;
}
int
Quit::handle_exception (ACE_HANDLE fd/* = ACE_INVALID_HANDLE*/){
this->reactor()->end_reactor_event_loop();
return -1;
}
int
Quit::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask){
ACE_Reactor_Mask m = ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL;
this->reactor()->remove_handler( this, m);
delete this;
return 0;
}
EchoServer.cpp
// EchoServer.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <ace/ACE.h>
#include "GAcceptor.h"
#include "Quit.h"
#include <ace/TP_Reactor.h>
#include <ace/Reactor.h>
#include <thread>
void fn(ACE_Reactor* app){
app->run_reactor_event_loop();
}
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
ACE_TP_Reactor tp;
ACE_Reactor reactor(&tp);
new Quit(&reactor);
new GAcceptor("192.168.0.2:1000", &reactor);
new GAcceptor("192.168.0.2:5500",&reactor);
std::thread t1( fn, &reactor);
std::thread t2( fn, &reactor);
t1.join();
t2.join();
ACE::fini();
return 0;
}