|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectedu.rit.pj.WorkerConstruct
edu.rit.pj.WorkerForLoop
edu.rit.pj.WorkerLongStrideForLoop
public abstract class WorkerLongStrideForLoop
Class WorkerLongStrideForLoop is the abstract base class for one variation of a worker for loop that is executed inside a WorkerRegion. The loop index data type is long. The loop stride is explicitly specified.
To execute a worker for loop, create a WorkerRegion object; create an instance of a concrete subclass of class WorkerLongStrideForLoop; and pass this instance to the worker region's execute() method. Either every worker team thread must call the worker region's execute() method with identical arguments, or every thread must not call the execute() method. You can do all this using an anonymous inner class; for example:
new WorkerRegion()
{
. . .
public void run()
{
. . .
execute (0L, 98L, 2L, new WorkerLongStrideForLoop()
{
// Thread local variable declarations
. . .
public void start()
{
// Per-thread pre-loop initialization code
. . .
}
public void run (long first, long last, long stride)
{
// Loop code
. . .
}
public void finish()
{
// Per-thread post-loop finalization code
. . .
}
});
}
. . .
}
In each process of a cluster parallel program, the worker team has one or more worker threads. Every worker thread in every process has a unique worker tag, going from tag 0 for the first worker thread in the first process to tag K−1 for the last worker thread in the last process, where K is the total number of worker threads in all the processes. In addition, in one process there is a master thread. The worker and master threads all call the worker region's execute() method to execute the worker for loop. However, the worker and master threads differ in their actions.
The master thread does the following. The master obtains the worker for loop's schedule as returned by the schedule() method. The range of loop indexes is divided into "chunks" and the chunks are apportioned among the workers in accordance with the schedule. The master repeatedly sends "tasks" to the workers and receives "responses" from the workers. To send a task to a particular worker, the master (1) sends a message containing the chunk index range to the worker's process; and (2) calls the worker for loop's sendTaskInput() method. This method's default implementation does nothing, but it can be overridden to send additional task input data to the worker. To receive a response from a particular worker, the master (1) receives a message containing the chunk index range from the worker's process; and (2) calls the worker for loop's receiveTaskOutput() method. This method's default implementation does nothing, but it can be overridden to receive additional task output data from the worker. Once all tasks have been sent to the workers and all responses have been received from the workers, the master returns from the worker region's execute() method.
Each worker thread does the following. The worker calls the worker for loop's start() method once before beginning any loop iterations. The worker repeatedly receives tasks from the master and sends responses to the master. To receive a task from the master, the worker (1) receives a message containing the chunk index range from the master's process; and (2) calls the worker for loop's receiveTaskInput() method. This method's default implementation does nothing, but it can be overridden to receive additional task input data from the master. The worker now calls the worker for loop's run() method, passing in the chunk index range lower and upper bounds. When the run() method returns, the worker sends the response to the master. To send the response, the worker (1) sends a message containing the chunk index range to the master's process; and (2) calls the worker for loop's sendTaskOutput() method. This method's default implementation does nothing, but it can be overridden to send additional task output data to the master. Once all tasks have been received from the master and all responses have been sent to the master, the worker calls the worker for loop's finish() method. (Unlike a ParallelTeam's threads, the workers do not synchronize with each other at a barrier at this point.) The worker then returns from the worker region's execute() method.
If the worker for loop has a fixed schedule (in which there is exactly one chunk with a predetermined index range for each worker), then the messages containing the chunk index range are omitted, and each worker gets its chunk index range directly from the fixed schedule. However, the task input data (if any) and task output data (if any) are still sent and received.
Each message described above is sent with a message tag equal to W+T, where W is the worker index and T is the "tag offset." The tag offset is Integer.MIN_VALUE by default, but this can be changed by overriding the tagOffset() method. Thus, the message tags fall in the range T .. K−1+T, where K is the total number of workers in all the processes. The program should not use message tags in this range except to send and receive the messages described above.
Note that each worker team thread actually creates its own instance of the worker for loop class and passes that instance to the worker region's execute() method. Thus, any fields declared in the worker for loop class will not be shared by all the workers, but instead will be private to each worker.
The start() method is intended for performing per-thread initialization before starting the loop iterations. If no such initialization is needed, omit the start() method.
The run() method contains the code for the loop. The first and last indexes for a chunk of loop iterations are passed in as arguments. The loop stride, which is always positive, is also explicitly specified as an argument. The worker for loop's run() method must be coded this way:
public void run (long first, long last, long stride)
{
for (long i = first; i <= last; i += stride)
{
// Loop body code
. . .
}
}
with the loop indexes running from first to last inclusive
and increasing by stride on each iteration.
The finish() method is intended for performing per-thread finalization after finishing the loop iterations. If no such finalization is needed, omit the finish() method.
If the worker for loop's start(), run(), or finish() method throws an exception in one of the worker threads, then that worker thread executes no further code in the loop, and the worker region's execute() method throws that same exception in that thread. However, the other worker threads in the worker team continue to execute.
| Constructor Summary | |
|---|---|
WorkerLongStrideForLoop()
Construct a new worker for loop. |
|
| Method Summary | |
|---|---|
void |
finish()
Perform per-thread finalization actions after finishing the loop iterations. |
void |
receiveTaskInput(LongRange range,
Comm comm,
int mRank,
int tag)
Receive additional input data associated with a task. |
void |
receiveTaskOutput(LongRange range,
Comm comm,
int wRank,
int tag)
Receive additional output data associated with a task. |
abstract void |
run(long first,
long last,
long stride)
Execute one chunk of iterations of this worker for loop. |
LongSchedule |
schedule()
Determine this worker for loop's schedule. |
void |
sendTaskInput(LongRange range,
Comm comm,
int wRank,
int tag)
Send additional input data associated with a task. |
void |
sendTaskOutput(LongRange range,
Comm comm,
int mRank,
int tag)
Send additional output data associated with a task. |
void |
start()
Perform per-thread initialization actions before starting the loop iterations. |
int |
tagOffset()
Returns the tag offset for this worker for loop. |
| Methods inherited from class edu.rit.pj.WorkerConstruct |
|---|
getThreadCount, getThreadIndex, getTotalThreadCount, isExecutingInParallel, isMasterThread, region, team |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Constructor Detail |
|---|
public WorkerLongStrideForLoop()
| Method Detail |
|---|
public LongSchedule schedule()
The schedule() method may be overridden in a subclass to return
the desired schedule. If not overridden, the default is a runtime
schedule (see LongSchedule.runtime()).
public void start()
throws Exception
The start() method may be overridden in a subclass. If not overridden, the start() method does nothing.
Exception - The start() method may throw any exception.
public void sendTaskInput(LongRange range,
Comm comm,
int wRank,
int tag)
throws IOException
The sendTaskInput() method may be overridden in a subclass. If not overridden, the sendTaskInput() method does nothing.
range - Chunk of loop iterations.comm - Communicator.wRank - Worker process rank.tag - Message tag.
IOException - Thrown if an I/O error occurred.
public void receiveTaskInput(LongRange range,
Comm comm,
int mRank,
int tag)
throws IOException
The receiveTaskInput() method may be overridden in a subclass. If not overridden, the receiveTaskInput() method does nothing.
range - Chunk of loop iterations.comm - Communicator.mRank - Master process rank.tag - Message tag.
IOException - Thrown if an I/O error occurred.
public abstract void run(long first,
long last,
long stride)
throws Exception
The run() method must be overridden in a subclass.
first - First loop index.last - Last loop index.stride - Loop index stride, always positive.
Exception - The run() method may throw any exception.
public void sendTaskOutput(LongRange range,
Comm comm,
int mRank,
int tag)
throws IOException
The sendTaskOutput() method may be overridden in a subclass. If not overridden, the sendTaskOutput() method does nothing.
range - Chunk of loop iterations.comm - Communicator.mRank - Master process rank.tag - Message tag.
IOException - Thrown if an I/O error occurred.
public void receiveTaskOutput(LongRange range,
Comm comm,
int wRank,
int tag)
throws IOException
The receiveTaskOutput() method may be overridden in a subclass. If not overridden, the receiveTaskOutput() method does nothing.
range - Chunk of loop iterations.comm - Communicator.wRank - Worker process rank.tag - Message tag.
IOException - Thrown if an I/O error occurred.
public void finish()
throws Exception
The finish() method may be overridden in a subclass. If not overridden, the finish() method does nothing.
Exception - The finish() method may throw any exception.public int tagOffset()
The tagOffset() method may be overridden in a subclass. If not overridden, the tagOffset() returns a default tag offset of Integer.MIN_VALUE.
|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||