EchoService.h
#pragma once
#include <ace/Asynch_IO.h>
#include <ace/Message_Block.h>
#include <ace/Message_Queue.h>
#include <ace/Synch.h>
#include <ace/Proactor.h>
#include <ace/SOCK_Stream.h>
class EchoService:public ACE_Handler
{
public:
EchoService(void);
~EchoService(void);
void open(ACE_Proactor* actor, const ACE_HANDLE& handle);
int send( const char* data);
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
private:
ACE_SOCK_Stream stream_;
ACE_Asynch_Read_Stream read_require_;
ACE_Message_Block read_msg_;
ACE_Asynch_Write_Stream write_require_;
ACE_Message_Queue<ACE_MT_SYNCH> write_msg_queue_;
ACE_Atomic_Op<ACE_Thread_Mutex, bool> writable_;
private:
void recv_();
void close();
};
EchoService.cpp
#include "EchoService.h"
#include <iostream>
EchoService::EchoService(void)
:read_msg_(1024),writable_(false),stream_(ACE_INVALID_HANDLE)
{
}
EchoService::~EchoService(void)
{
this->close();
}
void
EchoService::open(ACE_Proactor* actor, const ACE_HANDLE& handle){
this->proactor(actor);
this->stream_.set_handle(handle);
this->read_require_.open(*this, stream_.get_handle(), 0, this->proactor());
this->write_require_.open(*this, stream_.get_handle(), 0, this->proactor());
this->writable_ = true;
this->recv_();
if( this->write_msg_queue_.is_empty()) return;
ACE_Message_Block* m;
this->write_msg_queue_.dequeue_head(m);
this->write_require_.write(*m, m->length());
this->writable_ = false;
}
void
EchoService::close(){
this->read_require_.cancel();
this->write_require_.cancel();
this->stream_.close();
this->stream_.set_handle( ACE_INVALID_HANDLE);
this->writable_ = false;
}
void
EchoService::recv_(){
read_msg_.reset();
this->read_require_.read( read_msg_, read_msg_.space() - 1);
}
void
EchoService::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result){
if( !result.success()){
this->close();
std::cout<<std::endl<<"Error:"<<result.error();
return;
}
if( 0==result.message_block().length()){
this->close();
std::cout<<std::endl<<"Disconnected!!";
return;
}
result.message_block().copy("");
std::cout<<std::endl<<"RECV:"<<(const char*) result.message_block().rd_ptr();
this->recv_();
}
int
EchoService::send( const char* data){
if( ACE_INVALID_HANDLE== stream_.get_handle()){
return -1;
}
size_t len = ACE_OS::strlen( data )+1;
ACE_Message_Block* m;
ACE_NEW_RETURN(m, ACE_Message_Block(len), -1);
m->copy( data);
if( this->writable_.value() ){
this->writable_ = false;
return this->write_require_.write(*m, m->length());
}
return this->write_msg_queue_.enqueue_tail(m);
}
void
EchoService::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result){
if( !result.success()){
this->write_msg_queue_.enqueue_head(&result.message_block());
this->close();
std::cout<<std::endl<<"Error:"<<result.error();
return;
}
if( 0==result.message_block().length()){
result.message_block().release();
if( this->write_msg_queue_.is_empty()){
this->writable_ = true;
return;
}
ACE_Message_Block* m;
this->write_msg_queue_.dequeue_head(m);
this->write_require_.write(*m, m->length());
return;