Ru-Brd
6.3 The ACE_Task_Class
Motivation
The ACE_Message_Queue class described in Section 6.2 can be used to
Decouple the flow of information from its processing
Link threads that execute producer/consumer services concurrently
To use a producer/consumer concurrency model effectively in an object-oriented program, however, each thread should be associated with the message queue and any other service-related information. To preserve modularity and cohesion, and to reduce coupling, it's therefore best to encapsulate an ACE_Message_Queue with its associated data and methods into one class whose service threads can access it directly.
Thread-spawning capabilities provided by popular OS platforms are based on each spawned thread invoking a C-style function call. The ACE_Thread_Manager wrapper facade class described in Chapter 9 of C++NPv1 implements portable multithreading capabilities. However, programmers must still pass a C-style function to its spawn() and spawn_n() methods. Providing a spawned thread with access to a C++ object requires a bridge to the C++ object environment. The CLD_Handler::open() method (page 172) illustrated this technique. Since implementing this technique manually for each class is repetitive, it's a good candidate for reuse. The ACE Task framework therefore defines ACE_Task to encapsulate a class's messaging capability and provide a portable way for thread(s) to execute in the context of an object.
动机
在6.2节描述了ACE_Message_Queue可被用于:
解除信息流和它的处理
连接并发执行的生产者/消费者服务线程
为了在面向对象编程中高效的使用生产者消费者并发模型,每一个线程将与消息队列和服务相关信息关联。为了保留模块性和内聚性,减少耦合,因此将ACE_Message_Queue和他相关数据,方法封装到一个类中,其服务线程可以直接访问数据。
Class Capabilities
ACE_Task is the basis of ACE's object-oriented concurrency framework. It provides the following capabilities:
It uses an instance of ACE_Message_Queue from Section 6.2 to separate data and requests from their processing.
使用ACE_Message_Queue来分离数据和请求。
It uses the ACE_Thread_Manager class to activate the task so it runs as an active object [POSA2] that processes its queued messages in one or more threads of control. Since each thread runs a designated class method, they can access all of the task's data members directly.
他使用ACE_Thread_Manager来激活任务使他作为主动对象运行,处理一个或多个控制线程。因为每一个线程运行在一个指定的类方法,他们可以直接访问任务的成员数据。
It inherits from ACE_Service_Object, so its instances can be configured dynamically via the ACE Service Configurator framework from Chapter 5.
他从ACE_Service_Objcect继承, 可以通过ACE Service Configurator 框架动态配置。
It's a descendant of ACE_Event_Handler, so its instances can also serve as event handlers in the ACE Reactor framework from Chapter 3.
从ACE_Event_Handler继承,所以可以在ACE Reactor框架中作为一个事件处理器。
It provides virtual hook methods that application classes can reimplement for task-specific service execution and message handling.
他提供多个虚挂钩方法,应用类可以重新实现任务特有的服务和消息处理。
Our focus in this section is on the ACE_Task capabilities for queueing and processing messages. It obtains its event-handling, configuration, and dynamic linking/unlinking capabilities as a consequence of inheriting from ACE classes described in previous chapters.
我们的焦点在这一节是ACE_Task派队和消息处理的性能。他或的事件处理,配置,动态连接/解除的能力通过继承前面介绍的一些ACE类。
The interface for ACE_Task is shown in Figure 6.6. Since this class has a rich interface, we group the description of its methods into the three categories described below. Sidebar 60 (page 308) describes some additional ACE_Task methods that are related to the ACE Streams framework.
Figure 6.6. The ACE_Task Class
1. Task initialization methods. The methods used to initialize a task are shown in the following table:
Method
Description
ACE_Task()
Constructor that can assign pointers to the ACE_Message_Queue and ACE_Thread_Manager used by the task
构造器,可以指定ACE_Message_Queue和ACE_Thread_Manager。
open()
Hook method that performs application-defined initialization activities
挂钩,执行应用定义的初始化活动。
thr_mgr()
Get and set a pointer to the task's ACE_Thread_Manager
msg_queue()
Get and set a pointer to the task's ACE_Message_Queue
activate()
Convert a task into an active object that runs in one or more threads
将任务转化为运行在一个或多个线程中的主动对象。
Applications can customize the startup behavior of an ACE_Task by overriding its open() hook method. This method allocates resources used by a task, such as connection handlers, I/O handles, and synchronization locks. Since open() is generally called after the init() method that's inherited via ACE_Service_Object, any options passed via the ACE Service Configurator framework have already been processed. Therefore, open() can act on those configured preferences. The open() method is also often used to convert a task into an active object by calling ACE_Task::activate() (page 187).
应用程序可以自定义启动行为通过重新定义open挂钩方法。 任务通过这个方法来分配资源,例如连接处理器,I/O句柄,同步锁。因为open方法通常是在init方法后调用,任何通过ACE_Service Configurator框架传递的选项都已经被处理。因此可以在那些已配置的选择之上工作。open方法通过调用activate将任务转换为主动对象 。
The thr_mgr() and msg_queue() methods make it possible to access and change the thread management and message queueing mechanisms used by a task. Alternative thread management and message queueing mechanisms can also be passed to the ACE_Task constructor when an instance of the class is created.
thr_mgr()和msg_queue()方法可以访问和改变线程管理和消息排队机制。线程管理和消息排队机制又可以在ACE_Task构造器中传入。
The activate() method uses the ACE_Thread_Manager pointer returned by the thr_mgr() accessor to spawn one or more threads that run within the task. This method converts the task into an active object whose thread(s) direct its own execution and response to events, rather than being driven entirely by passive method calls that borrow the thread of the caller. Sidebar 42 (page 186) describes how to avoid memory leaks when an activated task's thread(s) exit.
activate()方法使用ACE_Thread_Manager来产生一个或多个线程运行在任务中。这个方法转换任务为一个主动对象,他的线程直接运行和响应事件,而不是被动模式驱动借用调用者线程。
2. Task communication, processing, and synchronization methods. The following table describes the methods used to communicate between tasks and to process messages passively and actively within a task:
任务通讯,处理和同步方法。
Method
Description
svc()
A hook method that can implement a task's service processing. It is executed by all threads spawned via the activate() method.
挂钩方法,实现任务处理。他被所有线程执行通过activate()方法。
put()
A hook method that can be used to pass a message to a task, where it can be processed immediately or queued for subsequent processing by the svc() hook method.
一个挂钩方法可以被用来传递消息到任务,他可以被立即处理或入队通过随后svc挂钩方法的处理。
putq()
getq()
ungetq()
Insert, remove, and replace messages from the task's message queue. The putq(), getq(), and ungetq() methods simplify access to the enqueue_tail(), dequeue_head(), and enqueue_head() methods of a task's message queue, respectively.
插入,删除,和从消息队列替换消息。putq(),getq(),ungetq(),方法简单的访问enqueue_tail(),dequeue_head(),和enqueue_head()方法。
A subclass of ACE_Task can perform application-defined processing on messages passed to it by overriding its put() and svc() hook methods to implement the following two processing models:
一个ACE_Task的子类可以执行应用定义的处理,当消息传给他时,put()和svc()挂钩方法实现下面两种处理模型:
1. Passive processing. The put() method is used to pass messages to an ACE_Task. Pointers to ACE_Message_Blocks are passed between tasks to avoid data copying overhead. Task processing can be performed entirely in the context of put(), where the caller's thread is borrowed for the duration of its processing. A task's svc() hook method need not be used if the task only processes requests passively in put().
1.被动处理。put()方法被用来传递消息到ACE_Task。ACE_Message_Blocks的指针在任务之间传递避免复制开销。任务处理可以在put()上下文中被执行,调用者线程会被借用来处理。一个任务的svc()挂钩不需要使用如果任务仅仅在put()中处理被动请求。
Sidebar 42: Avoiding Memory Leaks When Threads Exit
Calls to the ACE_Task::activate() or the ACE_Thread_Manager's spawn() and spawn_n() methods can include either of the following flags:
THR_DETACHED, which designates the spawned thread(s) as detached so that when the thread exits, the ACE_Thread_Manager ensures the storage used for the thread's state and exit status is reclaimed.
THR_JOINABLE, which designates the spawned thread(s) as joinable so that ACE_Thread_Manager ensures the identity and exit status of an exiting thread is retained until another thread reaps its exit status.
The terms detached and joinable stem from POSIX Pthreads [IEE96].
By default, ACE_Thread_Manager (and hence the ACE_Task class that uses it) spawns threads with the THR_JOINABLE flag. To avoid leaking resources that the OS holds for joinable threads, an application must call one of the following methods:
ACE_Task::wait(), which waits for all threads to exit an ACE_Task object
ACE_Thread_Manager::wait_task(), which waits for all threads to exit in a specified ACE_Task object
ACE_Thread_Manager::join(), which waits for a designated thread to exit.
If none of these methods are called, ACE and the OS won't reclaim the thread stack and exit status of a joinable thread, and the program will leak memory.
If it's inconvenient to wait for threads explicitly in your program, you can simply pass THR_DETACHED when spawning threads or activating tasks. Many networked application tasks and long-running daemon threads can be simplified by using detached threads. However, an application can't wait for a detached thread to finish with ACE_Task::wait() or obtain its exit status via join(). Applications can, however, use ACE_Thread_Manager::wait() to wait for both joinable and detached threads managed by an ACE_Thread_Manager to finish.
2. Active processing. A task's application-defined processing can also be performed actively. In this case, one or more threads execute the task's svc() hook method to process messages concurrently with respect to other activities in an application. If a task's put() method doesn't perform all the processing on a message, it can use putq() to enqueue the message and return to its caller immediately.
主动处理。一个任务应用定义的处理可以被主动执行。在这个情况中,一个或多个线程执行任务的svc()挂钩来并发的处理。如果put()方法不能执行所有的处理在一个消息上。他可以使用putq()来入队这个消息立即返回调用者。
A task's svc() hook method can use ACE_Task::getq() to dequeue messages placed onto the message queue and process them concurrently. The getq() method blocks until either a message is available on the message queue or the specified absolute timeout elapses. The blocking nature of getq() allows the thread(s) of a task to block and only wake up when there's work available on the message queue.
一个任务的svc()挂钩可以使用ACE_Task::getq()来出队消息和并发处理他们。getq()方法阻塞直到在队列上有一个消息可用或者制定的超时发生。getq()允许线程在惹任务中阻塞,仅在消息队列中有工作可作才被唤醒。
Unlike put(), the svc() method is never invoked by a client of a task directly. It's instead invoked by one or more threads when a task becomes an active object after its activate() method is called. The activate() method uses the ACE_Thread_Manager associated with an ACE_Task to spawn one or more threads, as shown below:
不同于put(),svc()方法从来不被任务的客户直接调用。他被一个或多个线程调用当一个任务变为主动对象时。activate方法使用ACE_Thread_Manager来产生一个或多个线程:
template <class SYNCH_STRATEGY> int
ACE_Task<SYNCH_STRATEGY>::activate (long flags,
int n_threads,
/* Other params omitted */) {
// ...
thr_mgr ()->spawn_n (n_threads,
&ACE_Task<SYNCH_STRATEGY>::svc_run,
ACE_static_cast (void *, this),
flags, /* Other params omitted */);
// ...
}
The THR_SCOPE_SYSTEM, THR_SCOPE_PROCESS, THR_NEW_LWP, THR_DETACHED, and THR_JOINABLE flags in the table on page 190 of C++NPv1 can be passed in the flags parameter to activate(). Sidebar 42 describes how THR_DETACHED and THR_JOINABLE can be used to avoid memory leaks when threads exit.
ACE_Task::svc_run() is a static method used by activate() as an adapter function. It runs in the newly spawned thread(s) of control, which provide an execution context for the svc() hook method. Figure 6.7 illustrates the steps associated with activating an ACE_Task using the Windows _beginthreadex() function to spawn the thread. Naturally, the ACE_Task class shields applications from OS-specific details.
ACE_Task::svc_run()是一个静态的方法,activate()使用他作为一个适配器函数。他运行在新派生的控制线程中,为svc()方法提供执行上下文。Figure 6.7解释了使用windows _beginthreadex()来派生线程从而激活ACE_Task的各个步骤。ACE_Task类将应用与OS特有的细节屏蔽开来。
Figure 6.7. Activating an ACE Task
When an ACE_Task subclass executes as an active object, its svc() method runs an event loop that uses its getq() method to wait for messages to arrive on the task's message queue. This queue can buffer a sequence of data messages and control messages for subsequent processing by a task's svc() method. As messages arrive and are enqueued by a task's put() method, its svc() method runs in separate thread(s), dequeueing the messages and performing application-defined processing concurrently, as shown in Figure 6.8.
当ACE_Task子类作为主动对象执行,他的svc方法运行一个事件循环使用他的getq()方法来等待消息的到达。这个队列可以缓存消息树据和控制消息被随后的svc()方法处理。消息到达被任务的put()方法放入队列,他的svc()方法运行在线程当中,出队消息,并发执行应用定义的处理。
Figure 6.8. Passing Messages Between ACE_Task Objects
Sidebar 43 compares and contrasts the ACE_Task capabilities with Java's Runnable interface and Thread class.
3. Task destruction. The methods used in the destruction of all or parts of a task are shown in the following table:
析构。
Method
Description
ACE_Task()Deletes resources allocated in ACE_Task constructor, which includes the message queue, if it wasn't passed as a parameter to the constructor.
释放构造器中分配的资源,包括消息队列,如果他不是作为参数传递给构造器的。
close()
Hook method that performs application-defined shutdown activities. This method should generally not be called directly by applications, particularly if the task is an active object.
挂钩方法执行应用定义的关闭活动。
flush()
Closes the message queue associated with the task, which frees all of its enqueued message blocks and releases any threads blocked on the queue.
关闭任务相关消息队列,释放所有他的入队消息块和释放任何阻塞在任务上的线程。
thr_count()
Returns the number of threads currently active in the ACE_Task.
返回当前活动的线程数量。
wait()
A barrier synchronizer that waits for all joinable threads running in this task to exit before returning.
一个栅栏同步等待所有joinable线程结束后返回。
The lifetime of an ACE_Task object is not tied to the lifetime of any threads activated in that object. Since deleting an ACE_Task object doesn't shut down any active threads, the threads must therefore exit before the task object can be deleted. ACE provides various ways to request a task's threads to exit, including the cooperative cancellation mechanism (pages 190?91 of C++NPv1). A task's message queue can also be used to pass a shutdown message to the task's threads, as described in Sidebar 41 (page 167).
ACE_Task对象和对象中的活动线程的生命周期不是捆绑在一起的。因为删除ACE_Task对象并不结束任何一个活动线程,线程必须结束在任务对象被删除前。ACE_提供各种方法来请求任务的线程结束,包括合作取消机制。任务消息队列可以被用来传递结束消息给线程。
Applications can customize an ACE_Task's destruction by overriding its close() hook method. This method can free application-defined resources allocated by a task, such as connection control blocks, I/O handles, and synchronization locks. Whereas the open() hook method should be called at most once per instance to initialize an object before it's activated, both the svc() and close() hook methods are called once in each thread. The svc() hook should therefore perform any needed per-thread initialization. The ACE Task framework will call the close() hook method in each thread after the svc() hook method returns, so avoid calling a task's close() hook directly, particularly if a task is an active object. This asymmetry between the open() and close() hook methods is necessary because there's no reliable opportunity to clean up a task's resources except in the task's threads.
应用程序可以自定义ACE_Task的析够通过重新定义他的close()挂钩。这个方法可以释放应用定义的任务分配的资源,例如connection control blocks,I/O句柄,和同步锁。open()方法可以在每一个实例初始化对象时被调用一次。svc()钩子应该执行任何线程需要的初始化。ACE Task框架会调用close()钩子方法在svc()钩子返回后在任何一个线程中。 避免直接调用任务的close()钩子方法,尤其是一个任务是主动对象。这个不对称的open()和close()钩子方法是必须的因为没有可靠的机会来清除任务资源除了任务线程中。
Sidebar 43: ACE_Task vs. Java Runnable and Thread
If you've used Java's Runnable interface and Thread class [Lea00], the ACE_Task design should look familiar, as discussed below:
ACE_Task::activate() is similar to the Java Thread.start() method since they both spawn internal threads. The Java Thread.start() method spawns only one thread, whereas activate() can spawn multiple threads within the same ACE_Task, making it easy to implement thread pools as shown in the Example part of this section.
ACE_Task::svc() is similar to the Java Runnable.run() method since both methods are hooks that run in newly spawned thread(s) of control. The Java run() hook method executes in only a single thread per object, whereas the ACE_Task::svc() method can execute in multiple threads per task object.
ACE_Task contains a message queue that allows applications to exchange and buffer messages. In contrast, this type of queueing capability must be added by Java developers explicitly.
If a task activates multiple threads, the close() method must not free resources (or delete the task object itself) if other threads are still executing. The thr_count() method returns the number of threads still active in the task. ACE_Task decrements the thread count before calling close(),soifthr_count() returns a value greater than 0, the object is still active. The wait() method can be used to block until all threads running in this task exit, at which point thr_count() equals 0. Sidebar 44 (page 190) describes the steps to follow when destroying an ACE_Task.
一个任务激活多个线程,close()方法不能释放资源(或者自己删除任务对象)如果其他线程如果其他线程仍在执行。thr_count()方法返回活动线程的数量。ACE_Task 减少线程数量在调用close()之前,因此如果thr_count()返回一个值大于0,对象仍然活着。wait()方法可以被用来阻塞直到所有任务中的线程结束 ,那一刻thr_count()等于0。
Example
This example shows how to combine ACE_Task and ACE_Message_Queue with the ACE_Reactor from Chapter 3 and ACE_Service_Config from Chapter 5 to implement a concurrent logging server. This server design is based on the Half-Sync/Half-Async pattern [POSA2] and the eager spawning thread pool strategy described in Chapter 5 of C++NPv1. Figure 6.9 (page 191) shows how a pool of worker threads is prespawned when the logging server is launched. Log records can be processed concurrently until the number of simultaneous client requests exceeds the number of worker threads. At this point, the main thread buffers additional requests in a synchronized ACE_Message_Queue until a worker thread becomes available or until the queue becomes full.
Figure 6.9. Architecture of the Thread Pool Logging Server
Sidebar 44: Destroying an ACE_Task
Special care must be taken when destroying an ACE_Task that runs as an active object. Before destroying an active object, ensure that the thread(s) running its svc() hook method have exited. Sidebar 41 (page 167) describes several techniques to shut down svc() hook methods that are blocked on a task's message queue.
If a task's life cycle is managed externally, whether dynamically allocated or instantiated on the stack, one way to ensure a proper destruction sequence looks like this: My_Task *task = new Task; // Allocate a new task dynamically.
task->open (); // Initialize the task.
task->activate (); // Run task as an active object.
// ... do work ...
// Deactive the message queue so the svc() method unblocks
// and the thread exits.
task->msg_queue ()->deactivate ();
task->wait (); // Wait for the thread to exit.
delete task; // Reclaim the task memory.
This technique relies on the task to exit all of its threads when the task's message queue is deactivated. This design introduces behavioral coupling, however, between the Task class and its users. Users depend on particular behavior when the message queue is deactivated, so any change to this behavior would cause undesired ripple effects throughout all systems that use the Task class.
If a task is allocated dynamically, it may therefore be better to have the task's close() hook delete itself when the last thread exits the task, rather than calling delete on a pointer to the task directly. You may still want to wait() on the threads to exit the task, however, particularly if you're preparing to shut down the process. On some OS platforms, when the main thread returns from main(), the entire process will be shut down immediately, whether there were other threads active or not.
The ACE_Message_Queue plays several roles in our thread pool logging server's half-sync/half-async concurrency design:
It decouples the main reactor thread from the thread pool. This design allows multiple worker threads to be active simultaneously. It also offloads the responsibility for maintaining queues of log record data from kernel space to user space, which has more virtual memory to queue log records than the kernel.
It helps to enforce flow control between clients and the server. When the number of bytes in the message queue reaches its high watermark, its flow control protocol blocks the main thread. As the underlying TCP socket buffers fill up, the flow control propagates back to the server's clients. This prevents clients from establishing new connections or sending log records until the worker threads have a chance to catch up and unblock the main thread.
Prespawning and queueing help to amortize the cost of thread creation, as well as constrain the use of OS resources, which can significantly improve server scalability.
The following table outlines the classes that we'll cover in the thread pool logging server example below:
Class
Description
TP_Logging_Task
Runs as an active object, with a pool of threads that process and store log records inserted into its synchronized message queue
TP_Logging_Handler
Target of upcalls from the ACE_Reactor that receives log records from clients and inserts them into the TP_Logging_Task's message queue
TP_Logging_Acceptor
A factory that accepts connections and creates TP_Logging_Handler objects to process client requests
TP_Logging_Server
A facade class that integrates the other three classes together
The relationship between these classes is shown in Figure 6.10 (page 192). The TP_Logging_Acceptor and TP_Logging_Handler classes play the reactive role in the Half-Sync/Half-Async pattern and the TP_Logging_Task::svc() method, which runs concurrently in the worker threads, plays the synchronous role in the pattern.
Figure 6.10. The Thread Pool Logging Server Classes
Each class in Figure 6.10 is described below. We start by including the necessary ACE header files into the TP_Logging_Server.h file:
#include "ace/OS.h"
#include "ace/Auto_Ptr.h"
#include "ace/Singleton.h"
#include "ace/Synch.h"
#include "ace/Task.h"
#include "Logging_Acceptor.h"
#include "Logging_Event_Handler.h"
#include "Reactor_Logging_Server.h"
#include "TPLS_export.h"
TP Logging Task. This class provides the following capabilities:
It derives from ACE_Task, which it instantiates to provide a synchronized ACE_Message_Queue.
It spawns a pool of worker threads that all run the same svc() method to process and store log records inserted into its synchronized message queue.
The TP_Logging_Task is shown below:
class TP_Logging_Task : public ACE_Task<ACE_MT_SYNCH> {
// Instantiated with an MT synchronization trait.
public:
enum { MAX_THREADS = 4 };
// ...Methods defined below...
};
The TP_Logging_Task::open() hook method calls ACE_Task::activate() to convert this task into an active object, as follows:
virtual int open (void * = 0)
{ return activate (THR_NEW_LWP, MAX_THREADS); }
If activate() returns successfully, the TP_Logging_Task::svc() method will be running in MAX_THREADS separate threads. We show the TP_Logging_Task::svc() method (page 199) after we describe the TP_Logging_Acceptor and TP_Logging_Handler classes.
The TP_Logging_Task::put() method inserts a message block containing a log record into the queue.
virtual int put (ACE_Message_Block *mblk,
ACE_Time_Value *timeout = 0)
{ return putq (mblk, timeout); }
We need only one instance of TP_Logging_Task, so we convert it into a singleton using the singleton adapter template described in Sidebar 45 (page 194). Since the TP_Logging_Task will be located in a DLL, however, we must use the ACE_Unmanaged_Singleton rather than ACE_Singleton. This design requires that we close the singleton explicitly when the logging task shuts down in TP_Logging_Server::fini() (page 201).
typedef ACE_Unmanaged_Singleton<TP_Logging_Task, ACE_Null_Mutex>
TP_LOGGING_TASK;
Since TP_LOGGING_TASK::instance() is only accessed from the main thread, we use ACE_Null_Mutex as the synchronization type parameter for ACE_Unmanaged_Singleton. If this singleton were accessed concurrently by other threads we'd need to parameterize it with ACE_Recursive_Thread_Mutex (Chapter 10 of C++NPv1) to serialize access.
TP_Logging_Acceptor. This class is a factory that provides the following capabilities:
It accepts connections from client logging daemons.
It creates TP_Logging_Handlers that receive log records from connected clients.
The TP_Logging_Acceptor class is shown below:
Sidebar 45: The ACE_Singleton Template Adapters
Although it's possible to code the TP_Logging_Task explicitly to be a singleton, this approach is tedious and error-prone. ACE therefore defines the following ACE_Singleton template adapter that applications can use to manage singleton life cycles: template <class TYPE, class LOCK>
class ACE_Singleton : public ACE_Cleanup {
public:
static TYPE *instance (void) {
ACE_Singleton<TYPE, LOCK> *&s = singleton_;
if (s == 0) {
LOCK *lock = 0;
ACE_GUARD_RETURN (LOCK, guard,
ACE_Object_Manager::get_singleton_lock (lock), 0);
if (s == 0) {
ACE_NEW_RETURN (s, (ACE_Singleton<TYPE, LOCK>), 0);
ACE_Object_Manager::at_exit (s);
}
}
return &s->instance_;
}
protected:
ACE_Singleton (void); // Default constructor.
TYPE instance_; // Contained instance.
// Single instance of the <ACE_Singleton> adapter.
static ACE_Singleton<TYPE, LOCK> *singleton_;
};
The ACE_Singleton::instance() static method uses the Double-Checked Locking Optimization pattern [POSA2] to construct and access an instance of the type-specific ACE_Singleton. It then registers the instance with the ACE_Object_Manager for cleanup at program termination. As described in Sidebar 23 of C++NPv1 (page 218), the ACE_ObjectManager assumes responsibility for destroying the ACE_Singleton instance, as well as the adapted TYPE instance.
A program can crash during singleton cleanup if the object code implementing TYPE is unlinked before the ACE_Object_Manager cleans up singletons, which is often the case for singletons located in dynamically linked services. We therefore recommend using ACE_Unmanaged_Singleton when defining singletons in DLLs that will be linked and unlinked dynamically. This class offers the same double-checked locking optimization to create the singleton. To destroy the singleton, however, requires an explicit call to ACE_Unmanaged_Singleton::close(). A dynamic service's fini() method is a good place to call this close() method, as shown in the TP_Logging_Server::fini() method (page 201).
class TP_Logging_Acceptor : public Logging_Acceptor {
public:
TP_Logging_Acceptor (ACE_Reactor *r = ACE_Reactor::instance ())
: Logging_Acceptor (r) {}
virtual int handle_input (ACE_HANDLE) {
TP_Logging_Handler *peer_handler = 0;
ACE_NEW_RETURN (peer_handler,
TP_Logging_Handler (reactor ()), -1);
if (acceptor_.accept (peer_handler->peer ()) == -1) {
delete peer_handler;
return -1;
} else if (peer_handler->open () == -1)
peer_handler->handle_close (ACE_INVALID_HANDLE, 0);
return 0;
}
};
Since TP_Logging_Acceptor inherits from the Logging_Acceptor (page 54) it can override handle_input() to create instances of TP_Logging_Handler.
TP_Logging_Handler. This class provides the following capabilities:
It receives log records from a connected client.
It enqueues the log records into the TP_LOGGING_TASK singleton's synchronized message queue.
The TP_Logging_Handler class is shown below:
class TP_Logging_Handler : public Logging_Event_Handler {
friend class TP_Logging_Acceptor;
Since this class derives from Logging_Event_Handler (page 59 in Section 3.3), it can receive log records when dispatched by a reactor.
The destructor is defined as protected to ensure dynamic allocation. However, since the TP_Logging_Acceptor::handle_input() method deletes objects of this type, TP_Logging_Acceptor must be a friend of this class. Declaring a friend class is appropriate in this case because these classes exhibit a high degree of cohesion, and it's important to maintain the restriction on dynamic allocation of TP_Logging_Handler. The three data members are used to implement the protocol for closing TP_Logging_Handler objects concurrently, as described in Sidebar 46 (page 196).
protected:
virtual
TP_Logging_Handler () {} // No-op destructor.// Number of pointers to this class instance that currently
// reside in the <TP_LOGGING_TASK> singleton's message queue.
int queued_count_;
// Indicates whether <Logging_Event_Handler::handle_close()>
// must be called to cleanup and delete this object.
int deferred_close_;
// Serialize access to <queued_count_> and <deferred_close_>.
ACE_Thread_Mutex lock_;
Sidebar 46: Closing TP_Logging_Handlers Concurrently
A challenge with thread pool servers is closing objects that can be accessed concurrently by multiple threads. In our thread pool logging server, TP_Logging_Handler pointers are used by TP_LOGGING_TASK threads. These service threads are separate from the thread running the reactor event loop that's driving callbacks to TP_Logging_Handler. We must therefore ensure that a TP_Logging_Handler object isn't destroyed while there are still pointers to it in use by TP_LOGGING_TASK.
When a logging client closes a connection, TP_Logging_Handler::handle_input() (page 197) returns -1. The reactor then calls the handler's handle_close() method, which ordinarily cleans up resources and deletes the handler. Unfortunately, that would wreak havoc if one or more pointers to that handler were still enqueued or being used by threads in the TP_LOGGING_TASK pool. We therefore use a reference counting protocol to ensure the handler isn't destroyed while a pointer to it is still in use. The UML activity diagram below illustrates the behavior this protocol enforces:
The protocol counts how many times a handler resides in the TP_LOGGING_TASK singleton's message queue. If the count is greater than 0 when the logging client socket is closed, TP_Logging_Handler::handle_close() can't yet destroy the handler. Later, as the TP_LOGGING_TASK processes each log record, the handler's reference count is decremented. When the count reaches 0, the handler can finish processing the close request that was deferred earlier.
The public part of the class defines the constructor and a pair of methods dispatched by the reactor when certain events occur.
public:
TP_Logging_Handler (ACE_Reactor *reactor)
: Logging_Event_Handler (reactor),
queued_count_ (0),
deferred_close_ (0) {}
// Called when input events occur, e.g., connection or data.
virtual int handle_input (ACE_HANDLE);
// Called when this object is destroyed, e.g., when it's
// removed from a reactor.
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);
};
ACE_FACTORY_DECLARE (TPLS, TP_Logging_Handler)
TP_Logging_Handler::handle_input() plays the reactive role in the HalfSync/Half-Async pattern. It differs from the Logging_Event_Handler::handle_input() method (page 59) since it doesn't process a log record immediately after receiving it. Instead, it inserts each log record into the TP_LOGGING_TASK singleton's message queue, where it will be processed concurrently. However, while processing the log record, the TP_LOGGING_TASK needs to access this handler's log file (inherited from Logging_Event_Handler). To facilitate this, handle_input() combines the log record with a message block containing the handler's pointer and inserts the resulting composite message at the end of the TP_LOGGING_TASK singleton's message queue, as shown below:
1 int TP_Logging_Handler::handle_input (ACE_HANDLE) {
2 ACE_Message_Block *mblk = 0;
3 if (logging_handler_.recv_log_record (mblk) != -1) {
4 ACE_Message_Block *log_blk = 0;
5 ACE_NEW_RETURN
6 (log_blk, ACE_Message_Block
7 (ACE_reinterpret_cast (char *, this)), -1);
8 log_blk->cont (mblk);
9 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);
10 if (TP_LOGGING_TASK::instance ()->put (log_blk) == -1)
11 { log_blk->release (); return -1; }
12 ++queued_count_;
13 return 0;
14 } else return -1;
15 }
Lines 2? Read a log record from a connected socket into a dynamically allocated ACE_Message_Block.
Lines 4? Create an ACE_Message_Block called log_blk that contains a pointer to this handler. The ACE_Message_Block constructor simply "borrows" the this pointer and sets the ACE_Message_Block::DONT_DELETE flag internally to ensure that the handler itself isn't destroyed when the message block is released in TP_Logging_Task::svc() (page 199). The mblk is attached to log_blk's continuation chain to form a composite message.
Lines 9?0 Use an ACE_Guard to acquire the lock_ that serializes access to queued_count_. To avoid a race condition in which a service thread processes the record before queued_count_ can be incremented, the lock is acquired before calling put() to insert the composite message block into the TP_LOGGING_TASK singleton's message queue.
Lines 11?3 Release the log_blk resources and return -1 if put() fails. If it succeeds, increment the count of the number of times the handler's in the TP_LOGGING_TASK's queue. In either case, the return statement causes the guard to release lock_.
Line 14 If the client closes the connection, or a serious error occurs, handle_input() returns -1. This value causes the reactor to call TP_Logging_Handler::handle_close(), which implements a key portion of the protocol for closing TP_Logging_Handlers concurrently, as described in Sidebar 46 (page 196). The handle_close() method is shown below:
1 int TP_Logging_Handler::handle_close (ACE_HANDLE handle,
2 ACE_Reactor_Mask) {
3 int close_now = 0;
4 if (handle != ACE_INVALID_HANDLE) {
5 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);
6 if (queued_count_ == 0) close_now = 1;
7 else deferred_close_ = 1;
8 } else {
9 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);
10 queued_count_--;
11 if (queued_count_ == 0) close_now = deferred_close_;
12 }
13
14 if (close_now) return Logging_Event_Handler::handle_close ();
15 return 0;
16 }
Line 3 The close_now variable records whether the Logging_Event_Handler::handle_close() method should be called on line 14. This decision depends on the reference count decisions made in the rest of this method.
Lines 4? This code runs when handle_close() is called when handle_input() returns -1. Lines 6? are performed within a critical section protected by an ACE_Guard that acquires and releases the lock_ automatically within the scope of the if clause. If queue_count_ equals 0, there are no references to this object remaining in the TP_LOGGING_TASK, so we set the close_now local variable to 1. Otherwise, we set the deferred_close_ data member to note that as soon as the reference count reaches 0 this handler should be destroyed since the client has already closed its socket endpoint. As log records are processed, the TP_Logging_Task::svc() method (page 199) calls handle_close() again, and will execute the lines described below.
Lines 8?2 The handle equals ACE_INVALID_HANDLE when handle_close() is called in TP_Logging_Task::svc() (page 199) after the handler has been removed from the task's message queue. We therefore decrement the queue_count_. If the count is now 0, we record whether we need to finish processing a close request that was deferred earlier. As in lines 5?, we use an ACE_Guard to acquire and release the lock_ automatically within the scope of the else clause.
Line 14 Call Logging_Event_Handler::handle_close() (page 60) if the local variable close_now is true to close the socket and log file, and then delete itself.
Now that we've shown the TP_Logging_Handler class, we can show the TP_Logging_Task::svc() method, which runs concurrently in each of the worker threads and implements the synchronous role in the Half-Sync/Half-Asynch pattern. This method runs its own event loop that blocks on the synchronized message queue. After a message is enqueued by TP_Logging_Handler::handle_input() (page 197), it will be dequeued by an available worker thread and written to the appropriate log file corresponding to the client, as shown below.
1 int TP_Logging_Task::svc () {
2 for (ACE_Message_Block *log_blk; getq (log_blk) != -1; ) {
3 TP_Logging_Handler *tp_handler = ACE_reinterpret_cast
4 (TP_Logging_Handler *, log_blk->rd_ptr ());
5 Logging_Handler logging_handler (tp_handler->log_file ());
6 logging_handler.write_log_record (log_blk->cont ());
7 log_blk->release ();
8 tp_handler->handle_close (ACE_INVALID_HANDLE, 0);
9 }
10 return 0;
11 }
Lines 2? Call the getq() method, which blocks until a message block is available. As shown in Figure 6.11, each message block is a composite message containing three message blocks chained together via their continuation pointers in the following order:
Figure 6.11. Message Block Chain of Log Record Information
A pointer to the TP_Logging_Handler that contains the log file where the log record will be written
The hostname of the connected client
The marshaled log record contents
Lines 5? Initialize logging_handler with the log_file and then call Logging_Handler::write_log_record(), which writes the log record to the log file. The write_log_record() method is responsible for releasing its message block chain, as shown in Chapter 4 of C++NPv1.
Lines 7? Call log_blk->release() to reclaim allocated resources. Since the TP_Logging_Handler pointer is borrowed rather than allocated dynamically, however, we must explicitly call TP_Logging_Handler::handle_close() (page 198) on tp_handler. This method decreases the TP_Logging_Handler reference count and cleans up the object properly, using the protocol described in Sidebar 46 (page 196).
TP_Logging_Server. This facade class inherits from ACE_Service_Object, contains a Reactor_Logging_Server, and uses the TP_LOGGING_TASK singleton, as shown below.
class TP_Logging_Server : public ACE_Service_Object {
protected:
// Contains the reactor, acceptor, and handlers.
typedef Reactor_Logging_Server<TP_Logging_Acceptor>
LOGGING_DISPATCHER;
LOGGING_DISPATCHER *logging_dispatcher_;
public:
TP_Logging_Server (): logging_dispatcher_ (0) {}
// Other methods defined below...
};
The TP_Logging_Server::init() hook method enhances the reactive logging server implementation from Chapter 3 as follows:
virtual int init (int argc, ACE_TCHAR *argv[]) {
int i;
char **array = 0;
ACE_NEW_RETURN (array, char*[argc], -1);
ACE_Auto_Array_Ptr<char *> char_argv (array);
for (i = 0; i < argc; ++i)
char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));
ACE_NEW_NORETURN
(logging_dispatcher_,
TP_Logging_Server::LOGGING_DISPATCHER
(i, char_argv.get (), ACE_Reactor::instance ()));
for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
if (logging_dispatcher_ == 0) return -1;
else return TP_LOGGING_TASK::instance ()->open ();
}
This init() method is similar to the one on page 122. It allocates an instance of TP_Logging_Server::LOGGING_DISPATCHER and stores its pointer in the logging_dispatcher_ member. It also calls TP_Logging_Task::open() to prespawn a pool of worker threads that process log records concurrently.
The TP_Logging_Server::fini() method is shown next:
1 virtual int fini () {
2 TP_LOGGING_TASK::instance ()->flush ();
3 TP_LOGGING_TASK::instance ()->wait ();
4 TP_LOGGING_TASK::close ();
5 delete logging_dispatcher_;
6 return 0;
7 }
Line 2 Call the flush() method of the TP_LOGGING_TASK singleton, thereby closing the message queue associated with the task, which deletes all the queued messages and signals the threads in the pool to exit.
Lines 3? Use the ACE_Thread_Manager's barrier synchronization feature to wait for the pool of threads spawned by TP_Logging_Task::open() to exit and then explicitly close the singleton because this DLL is about to be unlinked.
Lines 5? Delete the logging_dispatcher_ allocated in init() and return. For brevity, we omit the suspend(), resume(), and info() hook methods, which are similar to those shown in earlier examples.
Finally, we place ACE_FACTORY_DEFINE into TP_Logging_Server.cpp.
ACE_FACTORY_DEFINE (TPLS, TP_Logging_Server)
This macro automatically defines the _make_TP_Logging_Server() factory function that's used in the following svc.conf file:
dynamic TP_Logging_Server Service_Object *
TPLS:_make_TP_Logging_Server() "$TP_LOGGING_SERVER_PORT"
This file directs the ACE Service Configurator framework to configure the thread pool logging server via the following steps:
Dynamically link the TPLS DLL into the address space of the process.
Use the ACE_DLL class to extract the _make_TP_Logging_Server() factory function from the TPLS DLL symbol table.
This function is called to obtain a pointer to a dynamically allocated TP_Logging_Server.
The Service Configurator framework calls the TP_Logging_Server::init() hook method through this pointer, passing the value of the TP_LOGGING_SERVER_PORT environment variable as its single argument. This string designates the port number where the logging server listens for client connection requests.
If init() succeeds, the TP_Logging_Server pointer is stored in the ACE_Service_Repository under the name "TP_Logging_Server".
Once again, the ACE Service Configurator framework enables us to reuse the main() program from Configurable_Logging_Server.cpp (page 147).
Ru-Brd