This section first presents the motivation for MPI Pumps, describes them and describes the pumps that mpSpecTcl has and which roles run which pumps.
All of the roles in mpiSpecTcl run a Tcl interpreter that implements an event loop. Once the interpreter is started, control is turned over to that interpreter's main loop. Nonetheless, while the interpreter main loop is running, the processes in mpiSpecTcl need to communicate; Workers must receve event data, the Event sink pipeline must receive event parameters from the workers and all but the root process need to get commands relayed from the root.
Tcl's event loop provides two methods to be aware of activities outside of the interpreter itself, Notifiers added to the event loop and threads posting events to the event loop. The Notifier interface is quite complex and not well documented. Using threads to post events is, assuming thread safe code can be written, simple.
An MPI Pump, therefore is a thread that blocks on receiving messages from an MPI process,
converts the message into a Tcl event and posts that event to the Tcl event loop where
it is processed by an event handling functions. Event pumps can be based around
both MPI_Recv
or MPI_Bcast
. Note, however, that the
way that the lack of selectivity of MPI_Bcast
requires that we create
a separate MPI communicator along which each type of broadcast occurs.
MPI Pumps, in mpiSpecTcl are paired with an API for sending data to processes running the pump. The pump code and API are usually located in the same compilation module in mpiSpecTcl. Recall that all processes in MPI run the same executable image.
The example below shows pseudo code for a skeletal pump and its API. The message sent to the pump is encapsulated in a custom MPI data type.
Example 3-2. Sample Pump and API pseudo code
typedef MessageStruct {... }; static MPI_Datatype messageType() {
static MPI_Datatype MessageType; static bool typeDefined = false; if (!typeDefined) { ... MPI_Type_Createstruct( .... the message struct -> MessageType) MPI_Type_commit(&MessageType) typeDefined = true; } return MessageType; } void SendMessage(MessageStruct& msg, int recevingRank, int tag) {
MPI_Send(msg, 1, messageType(), receivingRank, tag, MPI_COMM_WORLD); } struct Event {
Tcl_Event event; MessageStruct msg; }; static int eventHandler(Tcl_Event* raw, int flags) {
Event* event = (Event*)raw; processEvent(event.msg); freeEvent(raw); return 1; } static int eventPump(ClientData d) {
Tcl_ThreadId mainThread = (Tcl_ThreadId)(d); while (1) { Event* pEvent = createEvent(); MPI_Status info; MPI_Recv(&pEvent->msg, 1, messageType(), MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &info); Tcl_ThreadQueueEvent(mainthread, &pEvent->event, TCL_QUEUE_TAIL); } } void startEventPump() {
Tcl_ThreadId pumpId; Tcl_CreateThread( &pumpId, eventPump, (ClientData)Tcl_GetCurrentThread(), TCL_THREAD_ST5ACK_DEFAULT, TCL_NOFLAGS ); }
The numbers in the explanation below refer to the numbers in the example above. Note that this is the generic structure of a pump and does not reflect any specific pump.
MPI_Send
messageType
does that the first time its called and that time and all
subsequent times it returns the data type handle (MPI_Datatype) for that type.
The sample code is a sketch and you should look at the man page for
MPI_Type_Createstrcut
for what needs to be done to
describe the struct to MPI.
MPI_Send
to send the
data to the designated rank with the designated tag. Note that
MPI_Bcast
could just as easily be used as long
as it was also used in eventPump
to receive the pump data.
Tcl_Event
stucture.
In our case we want to pass additional data, (the received message) to the
event handler. This struct, Event
encapsulates both a
Tcl_Event
header and the additional data we want.
Tcl_Event
data structure specifies the function
to call from the event loop. In this case it's our eventHandler
function.
That simply establishes access to the entire struct, thus getting access to the
message, does some messages specific processing (not shown) and frees
any dynamic storage allocated in the message field. Returning 1 causes the
Tcl run time to delete the event struct itself using Tcl_Free
.
createEvent
). MPI_Recv
is then used to receive data from some sender and the resulting extended event is
dispatched to the Tcl event loop with
Tcl_ThreadQueueEvent
Tcl_ThreadQueueEvent
Now that we've seen the general form of an MPI Pump, let's enumerate all of the pumps, what they pump, from where they pump and what is done with the data they pump. We'll also describe which source file implements the pump in case you are interestsed in looking at source code.
mpiSpecTcl's root process runs an interactive Tcl interpreter. SpecTcl command, however
are wrapped in one of two wrapper classes: CMPITCLCommand
which executes
the specified command in all but the Root process and CMPITclCommandAll
which
executes the command in all processes including the root process.
As you might guess, the non-root processes run a pump; the Tcl command pump to receive commands from the root process, queue them into the event loop from which they are executed. Note that all substitutions have been done by the time the command is dispatched by the root process, so the actual scope in which the command runs is not very important.
The definitions and pump itself are implemented in /mip/TclPump.{h,cpp}. This file also provides utilities for determining if SpecTcl is running under mpirun and, if so, the world rank of the process.
Transmission and receipt of commands thus encapsulated is via
MPI_Bcast
on the world communicator. Arbitrarily long commands are
supported by chunking up the command in to more than one message if necessary.
Tcl command execution results in a status (usually one of TCL_OK for
success and TCL_ERROR for errors), and a result. The result is some
arbitrary Tcl object that under nomral circumastances is set ultimately via
Tcl_SetObjResult
or some other related Tcl API function for
manipulating the result. When the status is TCL_ERROR typically
the result is an error message string. When TCL_OK the result may
be information requested by the command (for example spectrum -list
will produce a result that is a list containing spectrum definitions).)
The status and result of aan MPI distributed command become a bit more complex.
Commands executed by the Tcl Pump will use MPI_Send
to send the
status and result to the root process. Note that this round trip of MPI_Bcast
to distribute the command and MPI_Send
/MPI_Recv
is
synchronous. The root command blocks until all proceses have contributed a status and result.
The root process constucts a single status from the worst of the statuses. The result, in the case where there is an error, is the result from the first process that produced an error. When all processes completed the command with a TCL_OK status, the result is the longest result. Note that typically on success, all results will be the same.
I did say earlier that the Tcl result is an arbitrary Tcl object. This might seem difficult to return in an MPI Message. However in Tcl every object has a string representation. The Tcl_Obj object used to encapsulate an object supports shimmering an object into its string representation while simultaneously caching another representation or, if necessary, shimmering the string representation into another internal representation (e.g. a list or a dict). Thus the result is converted to its string representation and it is that which is sent back to the root process. Note as well that the result can be arbitrarily long and, if needed is chunked with result construction in the root process, properly handling the fact that the replies may be interleaved in an arbitrary way across the processes in mpiSpecTcl.
Finally, these command wrappers are able to detect that they are not running in the MPI environment and, if so, simply execute the encapsulated command. All encapsulated commands, therefore are transparently portable between serial and parallel SpecTcl runs.
A quick side note that in MPI mode, mpiSpecTcl only registers the BufferDecoder for ring items and, therefore only supports analyzing data from NSCLDAQ 10.0 and later. Earlier data may be analyzed by the same excutable but not with parallel workers (just run SpecTcl directly rather than with mpirun/mpiexec).)
The ring item pump is the mechanism by which event data are distributed to the worker processes. It is important to note that there are really two classes of ring items: Ring items with type PHYSICS_EVENT, and all other ring item types.
The PHYSICS_EVENT items are distributed to individual workers,
while all other event types are broadcast to all workers. This allows
the event processing pipeline to retrieve data such as the run number or the
run title, and for e.g. OnBegin
to execute in all
worker processes when a BEGIN_RUN state change item is
received.
The ring item pump is, therefore, actually a pair of pumps defined and implemented
in Core/RingItemPump.{h,cpp}. The function
startRingItemPump
called in all worker processes starts both pumps.
Non physics ring items have a broadcast pump. Those ring items are sent via
broadcastRingItem
which uses as many MPI_Bcast
calls as needed to send an arbitrary length ring item to all processes running the
ring item pump. The original ring item is reconstituted and queued to the
worker process's event loop queue. The broadcast pump thread function is
nonPhysicsThread
Physics ring items are sent to workers using sendRingItem
.
The distribution of Physics ring items is auto load levelling.
The worker pumps send an initial request for data (MPI_Send
). On the sending side,
when sendRingItem
is called it first does a receive (MPI_Recv
)
and then sends the ring item to the sender of the request. The request message is just an integer whose
value is ignored. In a later release, this could become the number of ring items to send.
The ring item event handler, after processing a PHYSICS_EVENT will send a request for the next physics item.
Of all the pumps the ring item pump is the most complex. It also maintains statistics Tcl variables in the sender process so that GUIs can correctly describe the number of items that have been processed and, with an online source, the fraction of data analyzed.
To simplify communication, event distribution uses a communicator split off from world
which only has the root process (rank 0 in the split communicator) and the worker processes
(rank 1 and larger in the split communicator). This is actually required since otherwise
the MPI_Bcast
calls used to send non-physics data can't be distinguished
from other broadcasts. The event sink pipeline is, naturally, not part of this communicator.
Workers in the event sink pipeline produce CEvent
objects,
normally indirectly by setting the values of CTreeParameter
or elements in CTreeParameterArray
objects.
These objects must be analyzed by the event sink pipeline which runs in the
world rank MPI_EVENT_SINK_RANK
(defined in Core/Globals.h).
CEvent
objects look like an extensible array. If an index of that
array is referenced that is out of bounds of the current size of the object, it is enlarged.
Since CEvent
objects are recycled, eventually they equilibrate in size.
CEvent
objects have two other features validity determination and a valid dope
vector.
First, Each element has
last modified sequence and an overall sequence number indicates the current event number.
This allows for two things:
Using an item element in an expression other than assignment to it allows a simple comparison between the item's sequence and the overall sequence to trap and throw exceptions for using uninitialized parameters in computations.
This scheme provides for O(1) invalidation of all elements of the CEvent
object. Incrementing the overall sequence number invalidates all members of the CEvent
preparing it for re-use in the next event.
The valid dope vector consists of a vector of indices of valid values in the CEvent
.
It is used by the histograming engine to determine which histograms may be increemented.
When a value is assigned to an element of a CEvent
, if that
element is not yet valid, after marking it valid, the index of that element is pushed
into the dope vector.
Now we have enough background to look at the parameter pump, which is used to send/receive events that workers have analyzed to the event sink pipeline for analysis and other processing.
The definition and implementation of this pump and API are in
Core/EventMessage.{h,cpp}. Each worker, upon processing a set of events into
an CEventList
(which is effectively a vector of CEvent
objects), calls HistogramEvents
. This sends the event list to the
parameter pump in the event sink pipeline running in the MPI_EVENT_SINK_RANK world
rank process.
HistogramEvents
is optimized for the relatively sparse events in
larger nuclear science experiments. It uses the dope vector associated with each event to
marshall the data into parameter number/parameter value pairs. It is these pairs that are
transmitted to the Event sink Pipeline which can trivially use these pairs to reconstruct the
original CEvent
object.
The parameter pump is a fan in. All workers send parameters to the same receiver process which has
a pump thread that just receives messages. The first message for any event's parameters
is an MPI_Recv
from MPI_ANY_SOURCE after that the remainder
of the event, if any is received by specifying the source of the first
chunk gotten as the source of the MPI_Recv
message. This deals with
the possibility that several workers can be simultaneously sending processed events to the
event sink.
Vectors of completed parameter index/value pairs converted to CEvent
which are queued to the event ander for processing by the event sink pipeline.
Gates which are created and modified by Tcl commands either interactively (through a GUI running in the root process) or via the ReST server (which also runs in the root process). Are distributed to the application via the Tcl command pump described previously. If, howver the display is Xamine, it can define gates by communicating with the SpecTcl xamine event handler, which runs in the Event sink pipeline process.
It is the job of the API and pump in Core/GatePump.{h,cpp} to distribute
these gate definitions to the rest of the application. These gates are sent by the
event sink pipeline process via a series of
MPI_Bcast
calls which send:
The gate name and type
The parameters the gate depends on.
The points that make up geometric gates.
Note that since Xamine can only create geometric gates, that is the only type of gate the pump and its API can handle. Once all messages that define a gate have been received, information about the gate is queued to the event loop where the gate is re-created.
Gates that are accepted/modified by the event sink procesing pipeline should should fire traces if they have been set by application scripts in the root process. The gate trace pump provides the mechanism to distribute these messages across the application. In Core/GateCommand.cpp, which implements the SpecTcl gate command, the class constructor starts the trace pump if the process is running in the MPI environment.
startTracePump
only does anything if the process is the root process,
in which case it starts the trace thread which is in the static method
CGateCommand
::mpiTraceRelayCatchThread
.
The histogrammer object coded in Core/Histogrammer.{h,cpp} establishes
a trace handler CHistogrammerGateTraceRelay
as a gate observer
when it's constructed. Traces it observes ultimately invoke
CHistogrammerGateTraceRelay
::forwardTrace
which sends sufficient information to the trace pump to allow it to fire the trace.
Note that Histogrammer
only exists in the event sink pipeline thread.
This collaboration between CGateCommand
and
CHistogrammer
is unique in the gate pump implementations.