Added bluegene emulator sources and test programs.
[charm.git] / src / langs / bluegene / BlueGene.C
1 // File: BlueGene.C
2
3 #define protected public
4 #include "BlueGene.h"
5
6 #define CYCLES_PER_HOP    5
7 #define CYCLES_PER_CORNER  75
8 #define CYCLE_TIME_FACTOR  0.001  // one cycle = nanosecond = 10^(-3) us  
9
10 CkChareID mainID ;
11 CkArrayID bgArrayID ;
12
13 //Main methods 
14 /*
15  * 'Main' calls 'BgInit', defined in user application for initialization 
16  * of bluegene machine.
17  */
18 Main::Main(CkArgMsg *msg)
19 {
20   args = msg ;
21   mainID = thishandle ;
22
23   if (msg->argc < 6) { 
24     CkAbort("Usage: <program> <x> <y> <z> <numCommTh> <numWorkTh>\n"); 
25   }
26     
27   CreateBgNodeMsg *bgNodeMsg = new CreateBgNodeMsg;
28   numBgX = bgNodeMsg->numBgX = atoi(msg->argv[1]) ;
29   numBgY = bgNodeMsg->numBgY = atoi(msg->argv[2]) ; 
30   numBgZ = bgNodeMsg->numBgZ = atoi(msg->argv[3]) ;
31   bgNodeMsg->numCTh = atoi(msg->argv[4]) ;
32   bgNodeMsg->numWTh = atoi(msg->argv[5]) ;
33
34   CreateBlueGene(bgNodeMsg) ;
35
36   BgInit(this) ;
37   starttime = CkWallTimer();
38   return ;
39 }
40
41 Main::~Main()
42 {
43   CProxy_BgNode bgArray(bgArrayID) ;
44
45   for( int i=0; i<numBgX; i++ )
46   for( int j=0; j<numBgY; j++ )
47   for( int k=0; k<numBgZ; k++ )
48   bgArray(i, j, k).destroy() ;
49 }
50
51 int Main::getNumArgs()
52 {
53   return args->argc ;
54 }
55
56 const char** Main::getArgs()
57 {
58   return (const char**)args->argv ;
59 }
60
61 /*
62  * 'CreateBlueGene' initializes BlueGene machine
63  * Specifies the machine configuration
64  *   - Number of BgNodes in X, Y, and Z dimension
65  *  - Number of Communication threads per BgNode
66  *  - Number of Worker threads per BgNode
67  *  - more: would also include latencies, cycles per hop, cycles per corner, etc
68  */
69 void Main::CreateBlueGene(CreateBgNodeMsg *msg)
70 {
71   bgArrayID = CProxy_BgNode::ckNew() ;
72   CProxy_BgNode bgArray(bgArrayID) ;  
73   for( int i=0; i<msg->numBgX; i++ )
74   for( int j=0; j<msg->numBgY; j++ )
75   for( int k=0; k<msg->numBgZ; k++ )
76   {
77     CreateBgNodeMsg *tempMsg = new CreateBgNodeMsg ;
78     tempMsg->numCTh = msg->numCTh ;
79     tempMsg->numWTh = msg->numWTh ;
80     tempMsg->numBgX = msg->numBgX ;
81     tempMsg->numBgY = msg->numBgY ;
82     tempMsg->numBgZ = msg->numBgZ ;
83     bgArray(i, j, k).insert(tempMsg) ;
84   }
85   bgArray.doneInserting() ;
86   delete msg ;
87 }
88
89 /*
90  * 'Finish' is a simple implementation for informing the system that 
91  * application has finished.
92  * more: should be modified to include quiescense detection: no hurry.
93  */
94 void Main::Finish(void) 
95 {
96   endtime = CkWallTimer();
97   ckout << "Total time : " << (endtime-starttime)*10.0 
98         << " microseconds per ring" << endl;
99   CkExit() ;
100 }
101
102 static void defaultHandler(ThreadInfo *)
103 {
104   CkAbort("BG> Invalid Handler called.\n");
105 }
106
107 //BgNode methods~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
108 /*
109  * 'BgNode' is initialization of a Blue Gene Node/CHIP.
110  *   - Create a (node-private) inBuffer, schedulerQ's, internal tables
111  *  - Creates all communication and worker threads.
112  *  - Calls 'BgNodeInit', user defined application function, where user registers 
113  *    the handlers for this BgNode and may trigger some message as microtasks or may 
114  *    send some messages to other nodes.
115  */
116 BgNode::BgNode(CreateBgNodeMsg *msg)
117 {
118   numCTh = msg->numCTh ;
119   numWTh = msg->numWTh ;
120   numBgX = msg->numBgX ;
121   numBgY = msg->numBgY ;
122   numBgZ = msg->numBgZ ;
123
124   inBuffer     = new InBuffer(this) ;
125   workThQ      = new SchedulerQ(numWTh) ;
126   commForMsg   = new SchedulerQ(numCTh) ;
127   commForWork  = new SchedulerQ(numCTh) ;
128   matchTable   = new PacketMsg*[numCTh+numWTh] ;
129   threadTable  = new CthThread[numCTh+numWTh] ;
130   handlerTable = new BgHandler[MAX_HANDLERS] ;
131   addMsgQ      = new MsgQ;
132
133   for(int i=0; i<MAX_HANDLERS; i++)
134     handlerTable[i] = (BgHandler) defaultHandler;
135   for(int i=0; i<(numCTh+numWTh); i++)
136     matchTable[i] = 0;
137   proxies = new CProxy_BgNode***[numBgX];
138   for(int i=0;i<numBgX;i++) {
139     proxies[i] = new CProxy_BgNode**[numBgY];
140     for(int j=0;j<numBgY;j++) {
141       proxies[i][j] = new CProxy_BgNode*[numBgZ];
142       for(int k=0;k<numBgZ;k++) {
143         proxies[i][j][k] = new CProxy_BgNode(bgArrayID, 
144                                              new CkArrayIndex3D(i,j,k));;
145       }
146     }
147   }
148
149   delete msg ;
150
151   //Create Communication Thread and enter in threadTable and sleep them in commForMsg
152   for(int i=0; i<numCTh; i++)
153   {
154     ThreadInfo *info = new ThreadInfo() ;
155     info->bgNode = this ;
156     info->selfID = i ;
157     CthThread t = CthCreate((CthVoidFn)::startCommTh, (void*)info , 0) ;
158     threadTable[i] = t ;
159     CthAwaken(t) ;
160   }
161
162   //Create worker Thread and enter in threadTable and sleep them in workThQ
163   for(int i=0; i<numWTh; i++)
164   {
165     ThreadInfo *info = new ThreadInfo() ;
166     info->bgNode = this ;
167     info->selfID = i + numCTh ;
168     CthThread t = CthCreate((CthVoidFn)::startWorkTh, (void*)info , 0) ;
169     threadTable[i+numCTh] = t;
170     CthAwaken(t) ;
171   }
172
173   nvData = (void*)BgNodeInit(this) ;
174 }
175
176 BgNode::~BgNode()
177 {
178   delete inBuffer ;
179   delete[] matchTable ;
180   delete[] threadTable ;
181   delete[] handlerTable ;
182   delete workThQ ;
183   delete commForMsg ;
184 }
185
186 void BgNode::registerHandler(int handlerID, BgHandler h) 
187 {
188   if(handlerID >= MAX_HANDLERS)
189     CkAbort("BG> Handler ID exceeded maximum.\n");
190   handlerTable[handlerID] = h;
191 }
192
193 /*
194  * Assign the MicroTask in PacketMsg to a free communication thread of the same BgNode.
195  * If all the communication threads are busy, allocate it to default communication thread '0'.
196  * The communication thread would schedule it.
197  * more: change this default stuff to an appropriate load-balanced-strategy
198  */
199  //more: change it
200 void BgNode::addMessage(PacketMsg *msgPtr, int handlerID, WorkType type)
201 {
202   msgPtr->handlerID  = handlerID ;
203   msgPtr->type = type ;
204
205   //get a communication thread ID, if available, else  enque to added messages
206   int commThID ;
207   if(-1 == (commThID = commForMsg->dequeThread()))
208   {
209     addMsgQ->enq(msgPtr);
210     return;
211   }
212
213   matchTable[commThID] = msgPtr;
214
215   //get thread address and awaken it 
216   CthThread t = threadTable[commThID] ;
217   CthAwaken(t) ;
218 }
219
220 /* 
221  * Put PacketMsg to inBuffer
222  * If inBuffer is Full then sleep for some time and retry.
223  */
224 void BgNode::putMessage(PacketMsg *msgPtr)
225 {
226   inBuffer->putMessage(msgPtr);
227 }
228
229 #define ABS(x) (((x)<0)? -(x) : (x))
230
231 static inline double MSGTIME(int ox, int oy, int oz, int nx, int ny, int nz)
232 {
233   int xd=ABS(ox-nx), yd=ABS(oy-ny), zd=ABS(oz-nz);
234   int ncorners = 2;
235   ncorners -= (xd?1:0 + yd?1:0 + zd?1:0);
236   ncorners = (ncorners<0)?0:ncorners;
237   return ncorners*CYCLES_PER_CORNER + (xd+yd+zd)*CYCLES_PER_HOP;
238 }
239 /*
240  * Send PacketMsg to BgNode[x,y,z]
241  * Set appropriate timestamps:
242  *  sendTime = currTime
243  *      recvTime = estimated time to reach the destination BgNode
244  */
245 void 
246 BgNode::sendPacket(int x, int y, int z, PacketMsg *msgPtr, 
247                    int handlerID, WorkType type)
248 {
249   msgPtr->handlerID = handlerID ;
250   msgPtr->type = type ;
251
252   msgPtr->recvTime = MSGTIME(thisIndex.x,thisIndex.y,thisIndex.z,x,y,z)
253                    + msgPtr->sendTime ;
254
255   proxies[x][y][z]->putMessage(msgPtr) ;
256 }
257
258 /*
259  * Assign the new Message in inBuffer to a free communication thread for scheduling.
260  * If all the communication threads are busy, then leave message in inBuffer and don't worry.
261  */
262 void BgNode::assignMsg()
263 {
264   int commThID = commForMsg->dequeThread() ;
265   
266   if(-1 == commThID)
267   {
268     /* ckout << "leaving assignMsg: didn't find commTh " << endl ; */
269     return ;
270   }
271
272   CthThread t = threadTable[commThID] ;
273   CthAwaken(t) ;
274
275   return ;
276 }
277
278 /*
279  * Start the communication thread as soon as it is created.
280  * The thread calls 'getMessage' on inBuffer to see if there is any 
281  * pending message.
282  *   If any, then get it from inBuffer and schedule it.
283  *  If new message is a small piece of work i.e. type == SMALL_WORK, execute it
284  *      else assign it to a free worker thread
285  *
286  * If there is no pending message in inBuffer, it looks in MatchTable to see if there is
287  * any microtask allocated to it. If any, it schedules it.
288  *  If the microtask is small piece of work then execute it and remove from matchTable 
289  *   else assign it to a free worker thread
290  *
291  * If no message, then sleep until no new message or microtask awakens it.
292  */
293 void BgNode::startCommTh(ThreadInfo *info)
294 {
295   PacketMsg *msgPtr = 0;
296   int selfID = info->selfID;
297   info->currTime = 0 ;
298
299   double tp1, tp2 ;
300   tp1 = BGTimer()*1e6;
301   while(true)
302   {
303     msgPtr = 0; 
304
305     msgPtr = inBuffer->getMessage();
306     if(msgPtr==0) { msgPtr=matchTable[selfID]; matchTable[selfID] = 0; }
307     if(msgPtr==0) { msgPtr=addMsgQ->deq(); }
308       
309     if(msgPtr==0)
310     {
311       //suspend thread
312       tp2 = BGTimer()*1e6;
313       info->currTime += (tp2-tp1) ;
314       commForMsg->enqueThread(selfID) ;
315       CthSuspend() ;
316       tp1 = BGTimer()*1e6;
317       continue ;
318     }
319
320     if(SMALL_WORK==msgPtr->type)
321     {
322       BgHandler handler = handlerTable[msgPtr->handlerID];
323       info->msg    = (void*)msgPtr ; 
324   
325       tp2 = BGTimer()*1e6;
326       info->currTime += (tp2-tp1) ;
327       //make timing adjustments for thread
328       if(msgPtr->recvTime > info->currTime)
329       info->currTime = msgPtr->recvTime ;
330       info->handlerStartTime = tp2 ;
331   
332       tp1 = tp2 ;
333       handler(info) ;
334       continue ;
335     }
336     else if(msgPtr->type==LARGE_WORK)
337     {
338       //get a worker thread ID, if available, else do polling
339       int workThID ; 
340       while(-1 == (workThID = workThQ->dequeThread()))
341       {
342         tp2 = BGTimer()*1e6;
343         info->currTime += (tp2-tp1) ;
344         commForWork->enqueThread(selfID);
345         CthSuspend();
346         tp1 = BGTimer()*1e6;
347       }
348
349       matchTable[workThID] = msgPtr;
350
351       //get thread address and awaken it 
352       CthThread t = threadTable[workThID] ;
353       CthAwaken(t) ;
354       continue ;
355     }
356     else
357       ckout << "Unidentified thread category, error" << endl ;
358   }
359 }
360
361 /*
362  * Start the worker thread as soon as it is created.
363  * It looks in matchTable to see if there is any message or microtask allocated to it.
364  *  If any, then execute it.
365  * If no work, then sleep until communication thread awakens it.
366  */
367 void BgNode::startWorkTh(ThreadInfo *info)
368 {
369   PacketMsg *msgPtr = 0;
370   int selfID = info->selfID;
371   info->currTime = 0 ;
372
373   double tp1, tp2 ;
374   tp1 = BGTimer()*1e6;
375   while(true)
376   {
377     msgPtr = 0 ;
378
379     msgPtr = matchTable[selfID];
380     matchTable[selfID] = 0;
381
382     if(NULL==msgPtr) 
383     {
384       tp2 = BGTimer()*1e6;
385       info->currTime += (tp2-tp1) ;
386       workThQ->enqueThread(selfID) ;
387       int commID = commForWork->dequeThread();
388       if (commID!=(-1))
389         CthAwaken(threadTable[commID]);
390       CthSuspend() ;
391       tp1 = BGTimer()*1e6;
392       continue ;
393     }
394
395     //get handler for this message and execute it
396     BgHandler handler = handlerTable[msgPtr->handlerID] ;  
397     info->msg    = (void*)msgPtr ; 
398
399     tp2 = BGTimer()*1e6;
400     info->currTime += (tp2-tp1) ;
401     //make timing adjustments for thread
402     if(msgPtr->recvTime > info->currTime)
403       info->currTime = msgPtr->recvTime ;
404     info->handlerStartTime = tp2 ;
405
406     tp1 = tp2 ;
407     handler(info) ;
408   }
409 }
410
411 void BgNode::getXYZ(int& i, int& j, int& k)
412 {
413   i = thisIndex.x ; j = thisIndex.y ; k = thisIndex.z ;
414 }
415
416 void BgNode::finish(void)
417 {
418   //more: should I delete all the node components here
419   CProxy_Main main(mainID) ;
420   main.Finish() ;
421 }
422
423 /***************************************************************************/
424 /*
425  * Convere system requires the startfunction of threads should be in C
426  */
427 extern "C" void startCommTh(ThreadInfo *info) 
428 {
429   (info->bgNode)->startCommTh(info) ;
430 }
431
432 extern "C" void startWorkTh(ThreadInfo *info)
433 {
434   (info->bgNode)->startWorkTh(info) ;
435 }
436
437 #include "BlueGene.def.h"