Java Thread实现读写同步
(wang hailong)
本文给出一个例子,说明如何使用java thread,处理读写者同步的问题。
文中的源代码能够编译运行。java thread.TestMain
本文可以和另一篇文章《Java Thread应该注意的问题》,进行比较阅读。
1.读写者同步问题
多个读者可以同时读取同一个缓冲区,但当有写者对缓冲区进行写操作时,具有排他性质,其他的读者都不能读取这个缓冲区,其他的写者也不能写这个缓冲区。
2.代码构成和思路
这个例子中包括四个类。
(1)读写资源类RWResource,包含了读写者的计数,共享资源,还有所有的同步代码。
(2)读者类RunnableReader。实现Runnable接口;进行读操作。
(3)写者类RunnableWriter。实现Runnable接口;进行写操作。
(4)测试类TestMain。生成并运行多个写线程和读线程,显示结果。
这个例子对共享资源进行“盒式封装”,把共享资源包含在一个“盒”内。并把所有的同步代码都集中在“盒”里面。读者类和写者类并不进行同步处理,只是申请资源,然后进行读写,读写完成之后,释放资源。
这种方法的优点是共享资源“盒”部分的代码直观易读,紧凑可控,读者类和写者类不用关心同步问题。缺点是共享资源“盒”规定了严格的调用顺序和调用规范,读者类和写者类必须严格遵守共享资源“盒”的调用规范,否则会造成线程死锁,或者资源操作冲突。
不过,即使由读者类和写者类来实现线程同步,如果不注意,也会造成线程死锁,或者资源操作冲突。这是线程的固有问题。:-)
下面给出这四个类的源代码和说明。
3.读写资源类RWResource
package thread;
import java.util.List;
import java.util.ArrayList;
/**
* resource for reading and writing
*/
public class RWResource {
/**
* When readerNumber == 0, there is no one reading or writing.
* When readerNumber > 0, readerNumber means number of readers.
* When readerNumber < 0, it means that some writer is writing.
*/
private int readerNumber = 0;
/**
* the shared resource for writing or reading
*/
private List buffer = null;
public RWResource() {
buffer = new ArrayList(512);
readerNumber = 0;
}
/**
* get buffer for reading.
* should be called before reading
* @return the buffer
*/
public synchronized List getBufferForReading(){
// if some writer is writing, wait until no writer is writing
while(readerNumber < 0){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
// when readerNumber >= 0
readerNumber++;
return buffer;
}
/**
* should be called after reading
*/
public synchronized void finishReading(){
readerNumber--;
if(readerNumber == 0){
this.notifyAll(); // notify possible waiting writers
}
}
/**
* get buffer for writing.
* should be called before writing.
* @return the buffer
*/
public synchronized List getBufferForWriting(){
// if some writer is writing or some reader is reading, wait until no one is writing or reading
while(readerNumber != 0){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
// when readerNumber == 0
readerNumber--; // now readderNumber == -1.
return buffer;
}
/**
* should be called after writing
*/
public synchronized void finishWriting(){
readerNumber++; // readerNumber = -1 + 1 = 0;
// readerNumber must be 0 at this point
this.notifyAll(); // notify possible waiting writers or waiting readers
}
}
读写资源类RWResource提供了4个Synchronized方法,分成两组,供读者和写者调用。
阅读上面的代码,可以看到,读写资源类RWResource通过readerNumber计数控制对共享资源的读写访问。当readerNumber等于0时,说明资源空闲,可以读写;当readerNumber大于0时,说明资源正在被一些读者读取,其他线程可以读,不可以写;当readerNumber小于0时(-1),说明资源被某个写者占用,正在写入,其他线程不可以读,也不可以写。
读者首先调用getBufferForReading()获取共享资源,如果readerNumber大于等于0,表示没有写者占用资源,读者能够获取共享资源,此时,readerNumber加1,表示读者的个数增加;读取之后,必须调用finishReading()释放资源,此时,readerNumber减1,表示读者的个数减少。
写者首先调用getBufferForWriting()获取共享资源,如果readerNumber等于0,表示资源空闲,写者能够获取到共享资源,此时,readerNumber减1,readerNumber的值变为-1,表示资源正在被写入;写者写完资源之后,必须调用,必须调用finishWriting()释放资源,此时,readerNumber加1,readerNumber的值变为0,回到空闲状态。
另外,还请留意读写资源类RWResource代码里面的wait()和notifyAll()调用。
读者在readerNumber小于0的情况下等待,调用Wait();写者在readerNumber大于0的情况下等待,调用Wait()。
在释放资源时( finishReading()或finishWriting() ),如果readerNumber的值变为0,回到空闲状态,调用notifyAll(),通知潜在的等待者——读者或写者。
4.读者类RunnableReader
package thread;
import java.util.List;
import java.util.Iterator;
public class RunnableReader implements Runnable{
private RWResource resource = null;
public RunnableReader() {
}
/**
* must be called before start running
* @param theResource
*/
public void setRWResource(RWResource theResource){
resource = theResource;
}
public void run(){
while(true){
// get the reader's name
String readerName = "[" + Thread.currentThread().getName() + "] ";
// first, get buffer for reading
List buffer = resource.getBufferForReading();
// reading
for(Iterator iterator = buffer.iterator(); iterator.hasNext(); ){
System.out.println(readerName + iterator.next());
}
int articleNumber = buffer.size();
int thinkingTime = articleNumber * 1000;
for(int i = 0; i < thinkingTime; i++){
// thingking hard when reading
}
// finish reading
resource.finishReading();
// rest
try{
Thread.sleep(articleNumber * 50);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
上述代码中的setRWResource()方法传入共享资源——读写资源类。本例采用参数传递的方法,在读者和写者之间共享读写资源类。
Run()方法实现Runnable接口的Run()方法。首先,获取当前读者(线程)的名称,然后,试图获取读资源——resource.getBufferForReading(),获取资源之后,读取buffer里面的所有文章,边读边思考(注意代码里面的for(int i = 0; i < thinkingTime; i++)行,占用cpu时间,表示思考过程),最后,释放资源——resource.finishReading()。读完文章,读者休息一段时间——Thread.sleep(articleNumber * 50)。
注意,在以上的过程中,一定要严格遵守这样的规定,在resource.getBufferForReading()和resource.finishReading()之间,进行读取操作。
5.写者类RunnableWriter
package thread;
import java.util.List;
import java.util.Iterator;
public class RunnableWriter implements Runnable{
private RWResource resource = null;
private int articleNumber = 0;
public RunnableWriter() {
articleNumber = 0;
}
/**
* must be called before start running
* @param theResource
*/
public void setRWResource(RWResource theResource){
resource = theResource;
}
public void run(){
while(true){
// get the writer's name
String writerName = "[" + Thread.currentThread().getName() + "] ";
// first, get buffer for reading
List buffer = resource.getBufferForWriting();
int nWritten = 3; // write 4 articles one time
for(int n = 0; n< nWritten; n++){
// writing
articleNumber++;
String articleName = "article" + articleNumber;
buffer.add(articleName);
System.out.println(writerName + articleName);
int thinkingTime = 10000;
for (int i = 0; i < thinkingTime; i++) {
// thingking hard when writing
}
} // finish writing
resource.finishWriting();
// rest
try{
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
上述代码中的setRWResource()方法传入共享资源——读写资源类。本例采用参数传递的方法,在读者和写者之间共享读写资源类。
Run()方法实现Runnable接口的Run()方法。首先,获取当前写者(线程)的名称,然后,试图获取写资源——resource.getBufferForWriting(),获取资源之后,开始向buffer里面写3篇文章,边写边思考(注意代码里面的for(int i = 0; i < thinkingTime; i++)行,占用cpu时间,表示思考过程),最后,释放资源——resource.finishWriting()。读完文章,写者休息一段时间——Thread.sleep(500)。
注意,在以上的过程中,一定要严格遵守这样的规定,在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。
6.测试类TestMain
package thread;
public class TestMain{
public static void main(String[] args) {
// init 生成共享资源
RWResource resource = new RWResource();
// 生成读者,设置共享资源
RunnableReader reader = new RunnableReader();
reader.setRWResource(resource);
// 生成写者,设置共享资源。
RunnableWriter writer = new RunnableWriter();
writer.setRWResource(resource);
int writerNumber = 5; // 5个写者
int readerNumber = 10; // 10个读者
// start writers 生成5个写线程,并给每个线程起个名字,writer1, writer2…
for(int i = 0; i < writerNumber; i++){
Thread thread = new Thread(writer, "writer" + (i+1));
thread.start();
}
// give writers enough time to think and write articles
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace();
}
// start readers生成10个读线程,并给每个线程起个名字,reader1, reader2…
for(int i = 0; i < readerNumber; i++){
Thread thread = new Thread(reader, "reader" + (i+1));
thread.start();
}
}
}
以上的测试类TestMain代码,生成并运行多个写线程和读线程,产生的结果可能如下:
[writer1] article1
[writer1] article2
[writer1] article3
…
[reader2] article1
[reader3] article1
[reader4] article1
[reader5] article1
[reader6] article1…
[reader1] article1
…
[writer3] article67
[writer3] article68
[writer3] article69
…
我们可以看到,Writer写的文章的号码从不相同,而且,每个Writer每次写3篇文章,写的过程从来不会被打断。每个读者每次通读所有的文章,经常有几个读者同时读同一篇文章的情况。
6.两种改进思路
这个例子采用“盒”包装的方法,把“锁”(readerNumber)和资源(buffer)放在同一个“盒子”(RWResource)里,但是,“盒子”对资源的包装是不完全的,只是简单地把资源(buffer)返回给读者和写者,并没有对资源(buffer)的访问操作进行封装。
其实,可以对资源(buffer)的访问进行进一步封装,比如,为“盒子”提供String[] readerBuffer()和writeBuffer(String)两个方法,在这两个方法里面,根据锁(readerNumber)的状态,判断读写操作是否合法,这样,代码的整体性会更好。带来的结果是,RWResource类的调用规范和顺序更加严格,必须在resource.getBufferForReading()和resource.finishReading()调用readerBuffer()方法,必须在resource.getBufferForWriting()和resource.finishWriting()之间调用writeBuffer()方法。否则,这两个方法会报错。
这样做会增加RWResource类的复杂度。还有一些设计上的因素需要考虑——readerBuffer和writeBuffer方法是否应该被synchronized修饰?
从上例看到,在resource.getBufferForReading()和resource.finishReading()之间,进行读操作;在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。读操作和写操作部分的代码,是不用同步的。所以,在getBufferForReading()和finishReading()这样的成对操作之间,用synchronized修饰readerBuffer和writeBuffer方法,是多此一举。
但是,从代码的完整性角度来看,因为readerBuffer和writeBuffer方法需要读 “锁”的状态,所以,readerBuffer和writeBuffer方法还是加上synchronized修饰符为好。
考虑到这些因素,本例采取了一种折衷的方法。从形式上看,“锁”和“资源”是聚合在一起的,实际上,两者的操作是分开的,并不相关。“盒子”根据“锁”的状态,调整资源的分配,读者和写者得到资源之后,享有对资源的完全访问权限。
另一个方向是把“资源”和“锁”完全分开。把getBufferForReading方法改成startReading,把getBufferForWriting方法改成startWriting,RWResource不再分配资源,只进行“锁”操作。把RWResource该作RWLock类。
RunnableReader和RunnableWriter类各自增加一个setBuffer()方法,共享buffer资源。这样,RunnableReader和RunnableWriter类就有了两个分开的方法:setBuffer()设置共享资源,setRWLock()设置读写锁。
对本例稍加修改,就可以实现上述的两种思路。限于篇幅,这里不能给出完整的修改代码。
7.补充
本例中对读写资源类RWResource强加了调用顺序。
在resource.getBufferForReading()和resource.finishReading()之间,进行读操作。
在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。
要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我的一个朋友提供了一个信息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read
}
finally {
rw.readlock().release()
}
}
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write
}
finally {
rw.writelock().release()
}
}
}
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。
建议,本文的例子比较简单,可以作为同步原理的入门简介,之后,访问专业化的代码http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
Enjoy it. :-)