비동기 IO 제어에 필요한 기술 요소
- 작업 요청 객체
- 작업 완료 객체 – ACE_Async_Result 파생 클래스
- 디스패처 객체 – ACE_Proactor
- 이벤트 객체 – ACE_Handler
비동기 IO 제어에 필요한 부수적인 기술 요소
- 버퍼 – ACE_Message_Block
- 핸들 – ACE_HANDLE
** 버퍼 관리 주의 사항
비동기 IO에서는 버퍼 제어권이 두가지입니다. 작업 요청이 이루어지면 버퍼의 제어권은 OS로 넘어갑니다. 작업 완료 단계에서는 다시 프로그래머에게 제어권이 넘어옵니다. 따라서 마지막에 버퍼를 소멸하는 과정에서는 필히 해당 버퍼를 사용할 가능성이 있는 작업 요청 객체의 cancel 함수를 호출해서 버퍼 제어권를 프로그래머로 전환한 후 소멸해야 합니다.
실제 Proactor 기반 서버에서는 제한된 버퍼만으로도 충분히 효과적입니다. 따라서 필요 이상으로 많은 버퍼를 사용할 필요가 없으며, 가능하면 버퍼를 재사용하는 습관을 가져야 합니다. 특히 버퍼 관리자를 도입해서 제한된 메모리에서 효율적으로 버퍼를 관리하면 성능 향상에 좋습니다. 확실한 사실은 Proactor 기반 서버에서는 버퍼 관리를 세밀하게 제어하면 여러 잇점이 존재합니다.
PAcceptor.h
#pragma once
#include <ace/Asynch_IO.h>
#include <ace/INET_Addr.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Message_Block.h>
#include <ace/Proactor.h>
class PAcceptor:public ACE_Handler
{
public:
PAcceptor(ACE_Proactor* actor, const char* addr);
~PAcceptor(void);
virtual void handle_accept (const ACE_Asynch_Accept::Result &result);
int run();
private:
ACE_Asynch_Accept require_;
ACE_INET_Addr addr_;
ACE_SOCK_Acceptor acceptor_;
ACE_Message_Block msg_;
};
PAcceptor.cpp
#include "PAcceptor.h"
#include <iostream>
#include "EchoService.h"
PAcceptor::PAcceptor(ACE_Proactor* actor, const char* addr)
:ACE_Handler(actor),addr_(addr),msg_( ( sizeof (sockaddr_in) + 16 ) * 2 )
{
acceptor_.open( addr_ );
this->require_.open(*this,acceptor_.get_handle(),0, this->proactor());
}
PAcceptor::~PAcceptor(void)
{
require_.cancel();
}
void
PAcceptor::handle_accept (const ACE_Asynch_Accept::Result &result){
std::cout<<std::endl<<"Enter Client:"<<result.accept_handle();
EchoService* svc = new EchoService( this->proactor(), result.accept_handle());
run();
}
int
PAcceptor::run(){
return this->require_.accept(msg_,0);
}
EchoService.h
#pragma once
#include <ace/Asynch_IO.h>
#include <ace/SOCK_Stream.h>
#include <ace/Message_Block.h>
#include <ace/Proactor.h>
class EchoService:public ACE_Handler
{
public:
EchoService(ACE_Proactor* actor, const ACE_HANDLE& handle);
~EchoService(void);
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
int run();
private:
ACE_Asynch_Read_Stream read_require_;
ACE_Asynch_Write_Stream write_require_;
ACE_SOCK_Stream stream_;
ACE_Message_Block msg_;
};
EchoService.cpp
#include "EchoService.h"
#include <iostream>
EchoService::EchoService(ACE_Proactor* actor, const ACE_HANDLE& handle)
:ACE_Handler(actor), stream_(handle), msg_(1024)
{
this->read_require_.open(*this, stream_.get_handle(),0,this->proactor());
this->write_require_.open(*this, stream_.get_handle(),0,this->proactor());
run();
}
EchoService::~EchoService(void)
{
this->read_require_.cancel();
this->write_require_.cancel();
}
int
EchoService::run(){
return this->read_require_.read( msg_, msg_.capacity());
}
void
EchoService::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
if( !result.success()){
std::cout<<std::endl<<"Error:"<<result.error();
delete this;
return;
}
if( 0==result.message_block().length() ){
std::cout<<std::endl<<"Client Exist";
delete this;
return ;
}
this->write_require_.write( result.message_block(), result.message_block().length());
}
void
EchoService::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
if( !result.success()){
std::cout<<std::endl<<"Error:"<<result.error();
delete this;
return;
}
if( 0==result.message_block().length() ){
result.message_block().reset();
this->read_require_.read(result.message_block(), result.message_block().capacity());
return;
}
this->write_require_.write(result.message_block(),result.message_block().length());
}
PEchoServer.cpp
// PEchoServer.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <ace/ACE.h>
#include "PAcceptor.h"
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
{
PAcceptor ac(ACE_Proactor::instance(), "192.168.0.2:1000");
ac.run();
ACE_Proactor::instance()->proactor_run_event_loop();
}
ACE::fini();
return 0;
}