ACE_Asynch_Acceptor, ACE_Asynch_Connector, ACE_Service_Handler 클래스를 활용하면 Proactor 기반 Acceptor-Connector 프로그램을 제작할 수 있습니다.
본 예제는 에코 클라이언트를 만드는 과정을 설명합니다.
EchoService.h
#pragma once
#include <ace/Asynch_IO.h>
#include <ace/Asynch_Connector.h>
#include <ace/Message_Block.h>
#include <ace/Message_Queue.h>
#include <ace/Synch.h>
class EchoService:public ACE_Service_Handler
{
private:
ACE_Asynch_Read_Stream read_require_;
ACE_Message_Block read_msg_;
ACE_Asynch_Write_Stream write_require_;
ACE_Message_Queue<ACE_MT_SYNCH> write_msg_queue_;
ACE_Atomic_Op<ACE_Thread_Mutex, bool> writable_;
public:
EchoService(void);
~EchoService(void);
virtual void open (ACE_HANDLE new_handle,ACE_Message_Block &message_block);
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
int send( const char* data);
private:
int recv_();
void close_();
};
template<class HANDLER>
class MY_CONNECTOR:public ACE_Asynch_Connector<HANDLER>
{
private:
HANDLER& obj_;
public:
MY_CONNECTOR(HANDLER& obj):obj_(obj)\
virtual HANDLER *make_handler (void){
return &obj_;
}
};
EchoService.cpp
#include "EchoService.h"
#include <iostream>
#include <ace/OS.h>
EchoService::EchoService(void)
:read_msg_(1024),writable_(false)
{
}
EchoService::~EchoService(void)
{
this->close_();
}
void
EchoService::close_(){
this->read_require_.cancel();
this->write_require_.cancel();
ACE_OS::closesocket( this->handle());
this->handle(ACE_INVALID_HANDLE);
this->write_msg_queue_.flush();
this->writable_ = false;
}
void
EchoService::open (ACE_HANDLE new_handle,ACE_Message_Block &message_block){
this->read_require_.open(*this,this->handle(), 0, this->proactor());
this->write_require_.open(*this, this->handle(), 0, this->proactor());
this->recv_();
this->write_msg_queue_.flush();
this->writable_ = true;
}
int
EchoService::send( const char* data){
if( ACE_INVALID_HANDLE==this->handle()) return -1;
size_t len = ACE_OS::strlen( data )+1;
ACE_Message_Block* m;
ACE_NEW_RETURN(m, ACE_Message_Block(len), -1);
m->copy( data );
if( this->writable_.value()){
this->writable_=false;
return this->write_require_.write(*m, m->length());
}
return this->write_msg_queue_.enqueue_tail(m);
}
int
EchoService::recv_(){
this->read_msg_.reset();
return this->read_require_.read(this->read_msg_, this->read_msg_.capacity()-1);
}
void
EchoService::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
if(! result.success() ){
result.message_block().release();
std::cout<<std::endl<<"Error Code:"<< result.error();
this->close_();
return;
}
if(0== result.message_block().length()){
result.message_block().release();
if( this->write_msg_queue_.is_empty()){
this->writable_= true;
return;
}
ACE_Message_Block* m;
this->write_msg_queue_.dequeue_head(m);
this->write_require_.write(*m, m->length());
return ;
}
this->write_require_.write( result.message_block(), result.message_block().length());
}
void
EchoService::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
if(! result.success() ){
std::cout<<std::endl<<"Error Code:"<< result.error();
this->close_();
return;
}
if(0== result.message_block().length()){
std::cout<<std::endl<<"Disconnted!!";
this->close_();
return;
}
result.message_block().copy("");
std::cout<<std::endl<<"RECV:"<<(const char*) result.message_block().rd_ptr();
this->recv_();
}
PEchoClient2.cpp
// PEchoClient2.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <ace/ACE.h>
#include "EchoService.h"
#include <ace/Proactor.h>
#include <ace/Thread_Manager.h>
#include <string>
#include <iostream>
ACE_THR_FUNC_RETURN fn(void * arg){
EchoService* obj = (EchoService*) arg;
std::string cmd;
while(true){
std::cout<<std::endl<<"전송할 데이터를 입력하세요. 종료시 exit를 입력하세요."<<std::endl;
std::getline( std::cin, cmd );
if("exit"==cmd) break;
for( int i=0;i<1000;++i){
obj->send( cmd.c_str());
}
obj->send("end");
ACE_OS::sleep(6);
}
ACE_Proactor::instance()->proactor_end_event_loop();
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
ACE::init();
{
EchoService es;
MY_CONNECTOR<EchoService> cs(es);
cs.open(true,ACE_Proactor::instance(),true);
cs.connect(ACE_INET_Addr("192.168.0.2:1000"));
ACE_Thread_Manager tm;
tm.spawn(fn,&es);
ACE_Proactor::instance()->proactor_run_event_loop();
tm.wait();
}
ACE::fini();
return 0;
}