Streaming Data Between Threads Using Pipes
The java.io package provides many classes for writing and reading data to and from streams. Most of the time, the data is written to or read from a file or network connection. Instead of streaming data to a file, a thread can stream it through a pipe to another thread. The first thread writes to the pipe, and the second thread reads from the pipe. A pipe is neither a file nor a network connection, but a structure in memory that holds the data that is written until it is read. Usually, a pipe has a fixed capacity. When the pipe is filled to this capacity, attempts to write more data will block waiting until some data is drained (read) from the pipe by another thread. Similarly, when a pipe is empty, attempts to read data from the pipe will block waiting until another thread writes some data into it.
线程间能够通过pipe传递数据。Pipe不是文件也不是网络连接,它是内存中的一种数据结构,能保存数据等待读取。
There are four pipe-related classes in the java.io package that can be used to stream data between threads: PipedInputStream, PipedOutputStream, PipedReader, and PipedWriter. A PipedInputStream and a PipedOutputStream are hooked together to transfer bytes between threads. A PipedReader and a PipedWriter are hooked together to transfer character data between threads. Figure 8.5 shows the class diagram for these classes. The PipedOutputStream object keeps a reference to the PipedInputStream object it is connected to. Similarly, the PipedWriter object keeps a reference to the PipedReader object it is connected to.
Java.io包中提供了四种pipe相关的类用来在线程间stream data:PipedInputStream,PipedOutputStream,PipedReader,PipedWriter。PipedInputStream,PipedOutputStream相结合传输bytes,PipedReader,PipedWriter组和传输character。PipedOutputStream保存着PipedInputStream的一个引用,PipedWriter保存着PipedReader的一个引用。
A pipe made up of a PipedInputStream and a PipedOutputStream has a capacity to hold 1024 bytes. This means that the thread doing the writing can be up to 1024 bytes ahead of the thread doing the reading. This buffering makes the transfer of data more efficient than a single-byte handoff would be. A pipe made up of a PipedReader and a PipedWriter has a capacity to hold 1024 characters. Again, this buffering allows the thread doing the writing to work a little bit ahead of the thread doing the reading. I discovered the size of the pipes (1024 bytes and 1024 characters) by examining the source code from Sun Microsystems. The API documentation gives no information or guarantees regarding the internal pipe size. Therefore, you should not depend on 1024 being the universal size.
研究源码发现发现pipe的缓冲区为1024bytes/characters,但java API文档中并没有明确给出相关信息。所以,实际编程时不需要考虑缓冲区大小。
PipedInputStream and PipedOutputStream each represent an end of the pipe and need to be connected to each other before data can be sent. Both PipedInputStream and PipedOutputStream have a constructor that takes a reference to the other. It doesn’t matter which is constructed first. You can write either
PipedInputStream and PipedOutputStream代表了pipe的两端,需要互相连接来发送数据。他们都有一个构造函数来保存对方的引用,但是没有先后顺序。
PipedInputStream pipeIn = new PipedInputStream();
PipedOutputStream pipeOut = new PipedOutputStream(pipeIn);
Or
PipedOutputStream pipeOut = new PipedOutputStream();
PipedInputStream pipeIn = new PipedInputStream(pipeOut);
Additionally, both ends can be created with their zero-argument constructors and connected together with connect(). You can write either
同时,还提供了connect()方法来建立两端的连接
PipedInputStream pipeIn = new PipedInputStream();
PipedOutputStream pipeOut = new PipedOutputStream();
pipeIn.connect(pipeOut);
or
PipedInputStream pipeIn = new PipedInputStream();
PipedOutputStream pipeOut = new PipedOutputStream();
pipeOut.connect(pipeIn);
If the ends of the pipe are not yet connected to each other, any attempt to read or write will cause an IOException to be thrown. Because of this, it’s generally a good idea to connect the ends right away by using the constructor. PipedReader and PipedWriter connect to each other in the same ways that PipedInputStream and PipedOutputStream do, so the same rules and guidelines apply.
如果两端没有连接便读取数据,会抛出IOException异常。所以,最好在构造函数中便建立连接。PipedReader and PipedWriter也是同样的情况。
PipedBytes
/*
* Created on 2005-7-15
*
* Java Thread Programming - Paul Hyde
* Copyright ? 1999 Sams Publishing
* Jonathan Q. Bo 学习笔记
*
*/
package org.tju.msnrl.jonathan.thread.chapter8;
import java.io.*;
/**
* @author Jonathan Q. Bo from TJU MSNRL
*
* Email:jonathan.q.bo@gmail.com
* Blog:blog.csdn.net/jonathan_q_bo
* blog.yesky.net/jonathanundersun
*
* Enjoy Life with Sun!
*
*/
public class PipeBytes {
public PipeBytes() {
}
public static void writeStuff(OutputStream rawout){
try{
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(rawout));
int[] data = {99,90,34,56,767,78,234,53,67,89,123};
for(int i = 0; i < data.length; i++){
out.writeInt(data[i]);
}
out.flush();
out.close();
}catch(IOException e){
e.printStackTrace();
}
}
public static void readStuff(InputStream rawin){
try{
DataInputStream in = new DataInputStream(new BufferedInputStream(rawin));
boolean eof = false;
while(!eof){
try{
int i = in.readInt();
System.out.println("has read : " + i);
}catch(EOFException e1){
eof = true;
}
}
System.out.println("Read all");
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
try{
final PipedOutputStream out = new PipedOutputStream();
final PipedInputStream in = new PipedInputStream(out);
Runnable runA = new Runnable(){
public void run(){
writeStuff(out);
}
};
Thread threadA = new Thread(runA,"threadA");
threadA.start();
Runnable runB = new Runnable(){
public void run(){
readStuff(in);
}
};
Thread threadB = new Thread(runB,"threadB");
threadB.start();
}catch(IOException e){
e.printStackTrace();
}
}
}
输出结果:
has read : 99
has read : 90
has read : 34
has read : 56
has read : 767
has read : 78
has read : 234
has read : 53
has read : 67
has read : 89
has read : 123
Read all
PipedCharacters
/*
* Created on 2005-7-15
*
* Java Thread Programming - Paul Hyde
* Copyright ? 1999 Sams Publishing
* Jonathan Q. Bo 学习笔记
*
*/
package org.tju.msnrl.jonathan.thread.chapter8;
import java.io.*;
/**
* @author Jonathan Q. Bo from TJU MSNRL
*
* Email:jonathan.q.bo@gmail.com
* Blog:blog.csdn.net/jonathan_q_bo
* blog.yesky.net/jonathanundersun
*
* Enjoy Life with Sun!
*
*/
public class PipeCharacters {
public PipeCharacters() {
}
public static void writeStuff(Writer rawOut){
try{
BufferedWriter out = new BufferedWriter(rawOut);
String[][] line = {
{"java","multithread","programming"},
{"it's","very","intrested"},
{"i","love","it"},
{"ha","ha","ha"}
};
/*输出,每行*/
for(int i = 0; i < line.length; i++){
String[] word = line[i];
/*每行中的每个单词*/
for(int j = 0; j < word.length; j++){
if( j > 0)
out.write(" ");//加上空格
out.write(word[j]);//输出单词
}
out.newLine();//换行
}
out.flush();
out.close();
}catch(IOException e){
e.printStackTrace();
}
}
public static void readStuff(Reader rawIn){
try{
BufferedReader in = new BufferedReader(rawIn);
String line;
while((line = in.readLine()) != null){
System.out.println("has read " + line);
}
System.out.println("Read all");
}catch(IOException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
try{
final PipedWriter out = new PipedWriter();
final PipedReader in = new PipedReader(out);
Runnable runA = new Runnable(){
public void run(){
writeStuff(out);
}
};
Thread threadA = new Thread(runA,"threadA");
threadA.start();
Runnable runB = new Runnable(){
public void run(){
readStuff(in);
}
};
Thread threadB = new Thread(runB,"threadB");
threadB.start();
}catch(Exception e){
e.printStackTrace();
}
}
}
输出结果:
has read java multithread programming
has read it's very intrested
has read i love it
has read ha ha ha
Read all
Using ThreadLocal and InheritableThreadLocal
The value returned from the get() method ThreadLocal depends on which thread invokes the method. InheritableThreadLocal allows these values to be inherited from parent to child thread.
ThreadLocal contains a reference to a WeakHashMap that holds key-value pairs. Weak references were introduced in JDK 1.2, and WeakHashMap takes advantage of them to automatically remove mappings for threads that have died and been de-referenced in all other places. This way, ThreadLocal does not keep track of values for threads that have long since died. In the WeakHashMap, the lookup key is the reference to the Thread and the value stored is a ThreadLocal.Entry object. ThreadLocal.Entry is an inner class to ThreadLocal and is used by ThreadLocal to store the thread-specific values.
InheritableThreadLocal is a subclass of ThreadLocal that provides a mechanism for the thread-specific variable to be inherited from parent thread to child thread. InheritableThreadLocal.Entry is a subclass of ThreadLocal.Entry and is also an inner class. Thread contains a private reference to an InheritableThreadLocal.Entry object and uses it to pass the thread-specific variable down from parent thread to child thread when a new thread is created.
ThreadLocal内置WeakHashMap保存Thread和ThreadLocal.Entry(内部类,保存了线程标识值,不是线程名)。InheritableThreadLocal是ThreadLocal的子类,提供了子父线程间线程标识的继承机制。
ThreadLocal API
ThreadLocal has two public methods. The first one is get():
public Object get()
get() is used to retrieve the thread-specific value. Internally, it looks up to see if the calling thread has a value stored. If is does, it returns that value. If not, it calls the protected method initialValue() to initialize a value for the calling thread, stores it in the internal WeakHashMap for future lookups, and returns the value to the caller. Typically, get() is the only method called on a ThreadLocal object.
public void set(Object value)
set() takes the value passed and stores it in the internal WeakHashMap for future lookups.
ThreadLocal is not abstract, but it generally needs to be subclassed to be useful. This protected method should be overridden in the subclass:
protected Object initialValue()
By default, initialValue() returns null, but in the subclass it can return a more meaningful value.
get()方法用来获得调用此方法的线程的线程标识。调用时,首先查找是否存在此线程的引用,如果有,则返回key/value对中的线程标识,如果没有,则调用protected initialValue()方法,形成此线程标识,保存在WeakHaspMap中待用。默认情况下,initialValue()方法返回null。
ThreadID
ThreadId.java
/*
* Created on 2005-7-15
*
* Java Thread Programming - Paul Hyde
* Copyright ? 1999 Sams Publishing
* Jonathan Q. Bo 学习笔记
*
*/
package org.tju.msnrl.jonathan.thread.chapter8;
/**
* @author Jonathan Q. Bo from TJU MSNRL
*
* Email:jonathan.q.bo@gmail.com
* Blog:blog.csdn.net/jonathan_q_bo
* blog.yesky.net/jonathanundersun
*
* Enjoy Life with Sun!
*
*/
public class ThreadId extends ThreadLocal{
private int nextId;
public ThreadId(){
nextId = 10001;
}
private synchronized Integer getNewId(){
Integer id = new Integer(nextId);
nextId++;
return id;
}
protected Object initialValue(){
print("initial value");
return getNewId();
}
public int getThreadId(){
Integer id = (Integer)get();
return id.intValue();
}
public void print(String msg){
String temp = Thread.currentThread().getName();
System.out.println(temp + " - " + msg);
}
}
ThreadIdMain.java
/*
* Created on 2005-7-15
*
* Java Thread Programming - Paul Hyde
* Copyright ? 1999 Sams Publishing
* Jonathan Q. Bo 学习笔记
*
*/
package org.tju.msnrl.jonathan.thread.chapter8;
/**
* @author Jonathan Q. Bo from TJU MSNRL
*
* Email:jonathan.q.bo@gmail.com
* Blog:blog.csdn.net/jonathan_q_bo
* blog.yesky.net/jonathanundersun
*
* Enjoy Life with Sun!
*
*/
public class ThreadIdMain implements Runnable{
private ThreadId var;
public ThreadIdMain(ThreadId var){
this.var = var;
}
public void run(){
try{
print("get threadid:" + var.getThreadId());
Thread.sleep(2000);
print("second get threadid:" + var.getThreadId());
}catch(InterruptedException e){
e.printStackTrace();
}
}
public void print(String msg){
String temp = Thread.currentThread().getName();
System.out.println(temp + " - " + msg);
}
public static void main(String[] args) {
ThreadId tId = new ThreadId();
ThreadIdMain share = new ThreadIdMain(tId);
try{
Thread threadA = new Thread(share,"threadA");
threadA.start();
Thread.sleep(500);
Thread threadB = new Thread(share,"threadB");
threadB.start();
Thread.sleep(500);
Thread threadC = new Thread(share,"threadC");
threadC.start();
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
输出结果:
threadA - initial value
threadA - get threadid:10001
threadB - initial value
threadB - get threadid:10002
threadC - initial value
threadC - get threadid:10003
threadA - second get threadid:10001
threadB - second get threadid:10002
threadC - second get threadid:10003
InheritableThreadLocal API
InheritableThreadLocal is a subclass of ThreadLocal and allows a thread-specific value to be inherited from the parent thread to the child thread. There are not any public methods on InheritableThreadLocal. It can be used directly as a special kind of ThreadLocal that passes its value from parent thread to child thread.
If you don’t want to use the parent thread’s value directly, you can override
protected Object childValue(Object parentValue)
to produce a customized child value at the time that the child thread is created. By default, childValue() simply returns parentValue.
InheritableThreadLocal父线程向子线程传递线程标识,在此类中无public方法,默认子父线程具有相同的线程标识。如果要定制子线程标识,则override childValue(Object parentValue)方法。
InheritableThreadID
/*
* Created on 2005-7-15
*
* Java Thread Programming - Paul Hyde
* Copyright ? 1999 Sams Publishing
* Jonathan Q. Bo 学习笔记
*
*/
package org.tju.msnrl.jonathan.thread.chapter8;
/**
* @author Jonathan Q. Bo from TJU MSNRL
*
* Email:jonathan.q.bo@gmail.com
* Blog:blog.csdn.net/jonathan_q_bo
* blog.yesky.net/jonathanundersun
*
* Enjoy Life with Sun!
*
*/
public class InheritableThreadId {
public static final int UNIQUE = 101;
public static final int INHERIT = 102;
public static final int SUFFIX = 103;
private ThreadLocal threadLocal;
private int nextId;
public InheritableThreadId(int type){
nextId = 201;
switch(type){
case UNIQUE:
threadLocal = new ThreadLocal(){
protected Object initialValue(){
print("unique ... initial value");
return getNewId();
}
};
break;
case INHERIT:
threadLocal = new InheritableThreadLocal(){
protected Object initialValue(){
print("inherit ... initial value");
return getNewId();
}
};
break;
case SUFFIX:
threadLocal = new InheritableThreadLocal(){
protected Object initialValue(){
print("suffix ... initial value");
return getNewId();
}
protected Object childValue(Object parentValue){
print("suffix ... child value");
return parentValue + "-CH";
}
};
break;
default:
break;
}
}
public static Runnable createTarget(InheritableThreadId id){
final InheritableThreadId var = id;
Runnable pRun = new Runnable(){
public void run(){
print("var.getId():" + var.getId());
Runnable cRun = new Runnable(){
public void run(){
print("var.getId():" + var.getId());
}
};
Thread pThread = Thread.currentThread();
String pName = pThread.getName();
print(pName + "create a child thread " + pName + "-CHILD");
Thread cThread = new Thread(cRun,pName + "-CHILD");
cThread.start();
}
};
return pRun;
}
private synchronized String getNewId(){
String id = "ID" + this.nextId;
nextId++;
return id;
}
public String getId(){
return (String)this.threadLocal.get();
}
public static void print(String msg){
String temp = Thread.currentThread().getName();
System.out.println(temp + " - " + msg);
}
public static void main(String[] args) {
try{
System.out.println("=======ThreadLocal=======");
InheritableThreadId varA = new InheritableThreadId(UNIQUE);
Runnable targetA = createTarget(varA);
Thread threadA = new Thread(targetA,"threadA");
threadA.start();
Thread.sleep(500);
System.out.println("\n===InheritableThreadLocal===");
InheritableThreadId varB = new InheritableThreadId(INHERIT);
Runnable targetB = createTarget(varB);
Thread threadB = new Thread(targetB,"threadB");
threadB.start();
Thread.sleep(500);
System.out.println("\n=InheritableThreadLocal,custum childValue=");
InheritableThreadId varC = new InheritableThreadId(SUFFIX);
Runnable targetC = createTarget(varC);
Thread threadC = new Thread(targetC,"threadC");
threadC.start();
}catch(InterruptedException e1){
e1.printStackTrace();
}
}
}
输出结果:
=======ThreadLocal=======
threadA - unique ... initial value
threadA - var.getId():ID201
threadA - threadAcreate a child thread threadA-CHILD
threadA-CHILD - unique ... initial value
threadA-CHILD - var.getId():ID202
===InheritableThreadLocal===
threadB - inherit ... initial value
threadB - var.getId():ID201
threadB - threadBcreate a child thread threadB-CHILD
threadB-CHILD - var.getId():ID201
=InheritableThreadLocal,custum childValue=
threadC - suffix ... initial value
threadC - var.getId():ID201
threadC - threadCcreate a child thread threadC-CHILD
threadC - suffix ... child value
threadC-CHILD - var.getId():ID201-CH