9777e36c46d8f63de7924d19c3cd7e9677ae5574
[charm.git] / src / arch / lapi / machine.c
1 /*****************************************************************************
2 LAPI version of machine layer
3 Based on the template machine layer
4
5 Developed by
6 Filippo Gioachin   03/23/05
7 Chao Mei 01/28/2010
8 ************************************************************************/
9
10 #include <lapi.h>
11
12 #include "converse.h"
13
14 #include <assert.h>
15 #include <errno.h>
16
17 /*Support for ++debug: */
18 #include <unistd.h> /*For getpid()*/
19 #include <stdlib.h> /*For sleep()*/
20
21 #include "machine.h"
22
23 /* Read the following carefully before building the machine layer on LAPI */
24 /* =========BEGIN OF EXPLANATION OF MACRO USAGE=============*/
25 /**
26  * 1. non-SMP mode: 
27  *   CMK_SMP = 0; 
28  *   CMK_PCQUEUE_LOCK = 1; (could be removed if memory fence and atomic ops are used) 
29  *  
30  *   (The following two could be disabled to reduce the overhead of machine layer) 
31  *     ENSURE_MSG_PAIRORDER = 0|1; 
32  *     ENABLE_CONVERSE_QD = 0|1; 
33  *  
34  *   (If ENSURE_MSG_PAIRORDER is 1, then setting DECOUPLE_BCAST_PROCESS to 1
35  *   will make the msg seqno increase at step of 1 w/o data race;
36  *     DECOUPLE_BCAST_PROCESS = 0|1;
37  * =========================================================== 
38  * 2. SMP mode without comm thd:
39  *    CMK_SMP = 1;
40  *    CMK_PCQUEUE_LOCK = 1;
41  *    CMK_SMP_NO_COMMTHD = 1;
42  *  
43  *    ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
44  *  
45  *    DECOUPLE_BCAST_PROCESS has same options as in non-SMP mode;
46  * =========================================================== 
47  *  3. SMP mode with comm thd:
48  *     CMK_SMP = 1;
49  *     CMK_PCQUEUE_LOCK = 1;
50  *     CMK_SMP_NO_COMMTHD = 0;
51  *  
52  *     ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
53  *  
54  *     (The following must be set with 1 as bcast msg is dealt with in comm thd!)
55  *     DECOUPLE_BCAST_PROCESS = 1;
56  *  ===========================================================
57  *  
58  *  Assumptions we made in different mode:
59  *  1. non-SMP:
60  *     a) imm msgs are processed when the proc is idle, and periodically. They should
61  *        never be processed in the LAPI thread;
62  *     b) forwarding msgs could be done on the proc or in the internal LAPI
63  *        completion handler threads;
64  *  2. SMP w/o comm thd:
65  *     a) same with non-SMP a)
66  *     b) forwarding bcast msgs could be done on proc whose rank=0;
67  *        (enable DECOUPLE_BCAST_PROCESS)  or in internal LAPI completion
68  *        handler threads;
69  *     c) the destination rank of proc-level bcast msg is always 0;
70  *  3. SMP w/ comm thd:
71  *     a) imm msgs are processed in comm thd;
72  *     b) forwarding bcast msgs is done in comm thd;
73  *     c) same with 2 c)
74  *  
75  */
76 /* =========END OF EXPLANATION OF MACRO USAGE=============*/
77
78 #if CMK_SMP
79 #define CMK_PCQUEUE_LOCK 1
80 #else
81 /** 
82  *  In non-smp case: the LAPI completion handler thread will
83  *  also access the proc's recv queue (a PCQueue), so the queue
84  *  needs to be protected. The number of producers equals the
85  *  #completion handler threads, while there's only one consumer
86  *  for the queue. Right now, the #completion handler threads is
87  *  set to 1, so the atomic operation for PCQueue should be
88  *  achieved via memory fence. --Chao Mei
89  */
90
91 /* Redefine CmiNodeLocks only for PCQueue data structure */
92 #define CmiNodeLock CmiNodeLock_nonsmp
93 #undef CmiCreateLock
94 #undef CmiLock
95 #undef CmiUnlock
96 #undef CmiTryLock
97 #undef CmiDestroyLock
98 typedef pthread_mutex_t *CmiNodeLock_nonsmp;
99 CmiNodeLock CmiCreateLock(){
100   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));  
101   pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
102   return lk;
103 }
104 #define CmiLock(lock) (pthread_mutex_lock(lock))
105 #define CmiUnlock(lock) (pthread_mutex_unlock(lock))
106 #define CmiTryLock(lock) (pthread_mutex_trylock(lock))
107 void CmiDestroyLock(CmiNodeLock lock){
108     pthread_mutex_destroy(lock);
109     free(lock);
110 }
111 #define CMK_PCQUEUE_LOCK 1
112 #endif
113 #include "pcqueue.h"
114
115 /** 
116  *  The converse qd is rarely used in current charm apps, so the
117  *  counter for converse qd could be disabled for max
118  *  performance. --Chao Mei
119  */
120 #define ENABLE_CONVERSE_QD 1
121
122 #if CMK_SMP
123 /** 
124  *  The macro defines whether to have a comm thd to offload some
125  *  work such as processing immdiate messages, forwarding
126  *  broadcast messages etc. This macro should be defined before
127  *  including "machine-smp.c".
128  *  --Chao Mei
129  */
130 #define CMK_SMP_NO_COMMTHD 0
131 #if CMK_SMP_NO_COMMTHD
132 int Cmi_commthread = 0;
133 #else
134 int Cmi_commthread = 1;
135 #endif
136
137 #endif
138
139 /** 
140  *  Enable this macro will offload the broadcast relay from the
141  *  internal completion handler thread. This will make the msg
142  *  seqno free of data-race. In SMP mode with comm thread where
143  *  comm thread will forward bcast msgs, this macro should be
144  *  enabled.
145  */
146 #define DECOUPLE_BCAST_PROCESS 1
147
148 /** 
149  * #####REGARDING IN-ORDER MESSAGE DELIVERY BETWEEN A PAIR OF 
150  * PROCESSORS#####: 
151  *  
152  * Since the lapi doesn't guarantee the order of msg delivery 
153  * (via network) between a pair of processors, we need to ensure 
154  * this order via msg seq no and a window sliding based msg 
155  * receiving scheme. So two extra fields are added to the basic 
156  * msg header: srcPe and seqno. For node messages, we process it 
157  * like a msg to be delivered to the first proc (rank=0) of that 
158  * node. 
159  *  
160  * BTW: The in-order delivery between two processors in the same
161  * node is guaranteed in the SMP mode as the msg transfer 
162  * doesn't go through LAPI. 
163  *  
164  * The msg transferred through LAPI (via network) is always 
165  * delivered to the first proc (whose rank is 0) on that node! 
166  *  
167  * --Chao Mei 
168  */
169 #define ENSURE_MSG_PAIRORDER 1
170
171 #if ENSURE_MSG_PAIRORDER
172
173 #define MAX_MSG_SEQNO 65535
174 /* MAX_WINDOW_SIZE should be smaller than MAX_MSG_SEQNO, and MAX(unsigned char) */
175 #define MAX_WINDOW_SIZE 128
176 #define INIT_WINDOW_SIZE 8
177
178 /* The lock to ensure the completion handler (PumpMsgsComplete) is thread-safe */
179 CmiNodeLock cmplHdlrThdLock = NULL;
180
181 /** 
182  *  expectedMsgSeqNo is an int array of size "#procs". It tracks
183  *  the expected seqno recved from other procs to this proc.
184  *  
185  *  nextMsgSeqNo is an int array of size "#procs". It tracks
186  *  the next seqno of the msg to be sent from this proc to other
187  *  procs.
188  *  
189  *  oooMsgBuffer is an array of sizeof(void **)*(#procs), each
190  * element (created on demand) points to a window (array) size 
191  * of CUR_WINDOW_SIZE which buffers the out-of-order incoming 
192  * messages (a (void *) array) 
193  *  
194  * oooMaxOffset indicates the maximum offset of the ooo msg 
195  * ahead of the expected msg. The offset begins with 1, i.e., 
196  * (offset-1) is the index of the ooo msg in the window 
197  * (oooMsgBuffer) 
198  *  
199  *  --Chao Mei
200  */
201
202 typedef struct MsgOrderInfoStruct{
203     /* vars used on sender side */
204     int *nextMsgSeqNo;
205
206     /* vars used on recv side */
207     int *expectedMsgSeqNo;        
208     void ***oooMsgBuffer;
209     unsigned char *oooMaxOffset;
210     unsigned char *CUR_WINDOW_SIZE;
211 }MsgOrderInfo;
212
213 /** 
214  *  broadcast msgs and p2p msgs uses different tracks of seq no
215  *  as is the case with "net" layers.
216  */
217 CpvDeclare(MsgOrderInfo, p2pMsgSeqInfo);
218 CpvDeclare(MsgOrderInfo, bcastMsgSeqInfo);
219
220
221 #endif
222
223 /*
224     To reduce the buffer used in broadcast and distribute the load from
225   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
226   spanning tree broadcast algorithm.
227     This will use the fourth short in message as an indicator of spanning tree
228   root.
229 */
230
231 #undef CMK_BROADCAST_SPANNING_TREE
232 #define CMK_BROADCAST_SPANNING_TREE 0
233
234 /*#define BROADCAST_SPANNING_FACTOR        CMK_SPANTREE_MAXSPAN*/
235 #define BROADCAST_SPANNING_FACTOR   2
236
237 #undef CMK_BROADCAST_HYPERCUBE
238 #define CMK_BROADCAST_HYPERCUBE     1
239
240 /**
241  * The broadcast root of a msg. 
242  * 0: indicate a non-broadcast msg 
243  *  
244  * >0: indicate a proc-level broadcast msg ("root-1" indicates 
245  * the proc that starts a broadcast)
246  *  
247  * <0: indicate a node-level broadcast msg ("-root-1" indicates 
248  * the node that starts a broadcast) 
249  *  
250  * On BG/P, we have a separate broadcast queue on each 
251  * processor. This will allow us to use the separate thread 
252  * (comm thread) to offload the broadcast operation from worker 
253  * threads --Chao Mei 
254  */
255 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
256 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
257 /* The actual msg size including the msg header */
258 #define CMI_MSG_SIZE(msg)                ((CmiMsgHeaderBasic *)msg)->size
259
260 #define CMI_MSG_SRCPE(msg)               ((CmiMsgHeaderBasic *)msg)->srcpe
261 #define CMI_MSG_SEQNO(msg)               ((CmiMsgHeaderBasic *)msg)->seqno
262
263 CpvDeclare(unsigned , networkProgressCount);
264 int networkProgressPeriod = 1000;
265
266 static int lapiDebugMode=0;
267 CsvDeclare(int, lapiInterruptMode);
268
269 static void ConverseRunPE(int everReturn);
270 static void PerrorExit(const char *msg);
271
272 static int Cmi_nodestart;   /* First processor in this node - stupid need due to machine-smp.h that uses it!!  */
273
274 static volatile int commThdExit = 0;
275
276 #include "machine-smp.c"
277
278
279 /* Variables describing the processor ID */
280
281 /* non-smp mode */
282 #if CMK_SHARED_VARS_UNAVAILABLE
283 /* Should the non-smp also needs the concept of node which equals to processor -Chao Mei */
284 int _Cmi_mype;
285 int _Cmi_numpes;
286 int _Cmi_myrank;
287
288 void CmiMemLock() {}
289 void CmiMemUnlock() {}
290
291 static struct CmiStateStruct Cmi_state;
292
293 #define CmiGetState() (&Cmi_state)
294 #define CmiGetStateN(n) (&Cmi_state)
295
296 void CmiYield() {
297     sleep(0);
298 }
299
300 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
301
302 int _Cmi_numpes;
303 int _Cmi_mynodesize;
304 int _Cmi_mynode;
305 int _Cmi_numnodes;
306
307 int CmiMyPe(void) {
308     return CmiGetState()->pe;
309 }
310
311 int CmiMyRank(void) {
312     return CmiGetState()->rank;
313 }
314
315 int CmiNodeFirst(int node) {
316     return node*_Cmi_mynodesize;
317 }
318 int CmiNodeSize(int node)  {
319     return _Cmi_mynodesize;
320 }
321
322 int CmiNodeOf(int pe)      {
323     return (pe / _Cmi_mynodesize);
324 }
325 int CmiRankOf(int pe)      {
326     return (pe % _Cmi_mynodesize);
327 }
328 #endif
329
330 CpvDeclare(void*, CmiLocalQueue);
331
332 #if DECOUPLE_BCAST_PROCESS
333 CpvDeclare(PCQueue, procBcastQ);
334 #if CMK_NODE_QUEUE_AVAILABLE
335 CsvDeclare(PCQueue, nodeBcastQ);
336 #endif
337 #endif
338
339 #if CMK_NODE_QUEUE_AVAILABLE
340 #define DGRAM_NODEMESSAGE   (0x7EFB)
341 #define NODE_BROADCAST_OTHERS (-1)
342 #define NODE_BROADCAST_ALL    (-2)
343 #endif
344
345 /* The way to read this macro
346  * "routine args" becomes a function call inside the parameters of check_lapi_err,
347  * and it returns a int as returnCode;
348  * "#routine" turns the "routine" as a string
349  * __LINE__ is the line number in the source file
350  * -Chao Mei
351  */
352 #define check_lapi(routine,args) \
353         check_lapi_err(routine args, #routine, __LINE__);
354
355 static void check_lapi_err(int returnCode,const char *routine,int line) {
356     if (returnCode!=LAPI_SUCCESS) {
357         char errMsg[LAPI_MAX_ERR_STRING];
358         LAPI_Msg_string(returnCode,errMsg);
359         fprintf(stderr,"Fatal LAPI error while executing %s at %s:%d\n"
360                 "  Description: %s\n", routine, __FILE__, line, errMsg);
361         CmiAbort("Fatal LAPI error");
362     }
363 }
364
365 static void lapi_err_hndlr(lapi_handle_t *hndl, int *error_code, 
366                             lapi_err_t *err_type, int *task_ID, int *src){
367     char errstr[LAPI_MAX_ERR_STRING];
368     LAPI_Msg_string(*error_code, errstr);
369     fprintf(stderr, "ERROR IN LAPI: %s for task %d at src %d\n", errstr, *task_ID, *src);
370     LAPI_Term(*hndl);
371     exit(1);
372 }
373
374 /**
375  * The lapiContext stands for the lapi context for a single lapi 
376  * task. And inside one lapi task, only one lapi context could 
377  * be created via lapi_init. In SMP mode, this context is 
378  * created by proc of rank 0, and then it is shared among all 
379  * cores on a node (threads) --Chao Mei 
380  * 
381  */
382 static lapi_handle_t lapiContext;
383 static lapi_long_t lapiHeaderHandler = 1;
384
385 /**
386  * Note on broadcast functions: 
387  * The converse QD may be wrong if using spanning tree or 
388  * hypercube schemes to send messages --Chao Mei 
389  */
390 void SendMsgToPeers(int size, char *msg, int includeSelf);
391 #if ENSURE_MSG_PAIRORDER
392 void SendSpanningChildren(int size, char *msg, int srcPe, int *seqNoArr);
393 void SendHypercube(int size, char *msg, int srcPe, int *seqNoArr);
394 #else
395 void SendSpanningChildren(int size, char *msg);
396 void SendHypercube(int size, char *msg);
397 #endif
398
399 #if CMK_NODE_QUEUE_AVAILABLE
400 /** 
401  *  The sending schemes of the following two functions are very
402  *  similar to its corresponding proc-level functions
403  */
404 void SendSpanningChildrenNode(int size, char *msg);
405 void SendHypercubeNode(int size, char *msg);
406 #endif
407
408 /**
409  * There is a function "CkCopyMsg" in the charm++ level, which
410  * considers more than mere memcpy as there could be var size msg
411  * --Chao Mei
412  */
413 char *CopyMsg(char *msg, int len) {
414     char *copy = (char *)CmiAlloc(len);
415     if (!copy)
416         fprintf(stderr, "Out of memory\n");
417     memcpy(copy, msg, len);
418     return copy;
419 }
420  
421 CsvDeclare(CmiNodeState, NodeState);
422
423 #if CMK_IMMEDIATE_MSG
424 #include "immediate.c"
425 #endif
426
427 /** 
428  *  This function will be never called in the comm thd!
429  *  It is only used in the non-SMP mode, or in SMP mode w/o comm
430  *  thd! --Chao Mei
431  */
432 static void AdvanceCommunication(){
433 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
434
435     if(!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
436 #if CMK_IMMEDIATE_MSG
437     /** 
438      * Immediate msgs are handled in CmiPushNode and 
439      * CmiSendNodeSelf, CmiPushPe and CmiSendSelf in the non-SMP 
440      * case, but in SMP case, those four functions could be called 
441      * in the completition handler where "CmiMyPe(), CmiMyRank()" 
442      * will be wrong as the "CmiGetState()" will not return a right 
443      * proc-specific CmiState!! So immediate messages are handled 
444      * when proc is idle. This may cause a big delay for processing 
445      * immdiate messages in SMP mode if there's not a dedicated 
446      * communication thread. 
447      *  
448      * Even in non-SMP, inside imm msg handlers, array proxy 
449      * messages could be sent. If handled in LAPI internal 
450      * completion thread, it will cause data-racing problems for 
451      * data structures that maintain charm array info. So it needs 
452      * to be handled in the context of a proc. 
453      *  
454      * -Chao Mei 
455      */
456     MACHSTATE1(2, "[%p] Handling Immediate Message begin{",CmiGetState());
457     CmiHandleImmediate();
458     MACHSTATE1(2, "[%p] Handling Immediate Message end}",CmiGetState());
459 #endif
460
461 #endif
462 }
463
464 /* non-smp CmiStartThreads. -Chao Mei */
465 #if CMK_SHARED_VARS_UNAVAILABLE
466 /* To conform with the call made in SMP mode */
467 static void CmiStartThreads(char **argv) {
468     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
469 }
470 #endif
471
472 /* ===== Beginging of functions regarding ensure in-order msg delivery ===== */
473 #if ENSURE_MSG_PAIRORDER
474
475 /**
476  * "setNextMsgSeqNo" actually sets the current seqno, the 
477  * "getNextMsgSeqNo" will increment the seqno, i.e., 
478  * "getNextMgSeqNo" returns the next seqno based on the previous 
479  * seqno stored in the seqno array. 
480  * --Chao Mei 
481  */
482 static int getNextMsgSeqNo(int *seqNoArr, int destPe){
483     int ret = seqNoArr[destPe];
484     ret++;
485     return ret;
486 }
487 static void setNextMsgSeqNo(int *seqNoArr, int destPe, int val){
488     /* the seq no. may fast-forward to a new round (i.e., starting from 1 again!) */
489     if(val>=MAX_MSG_SEQNO) val -= MAX_MSG_SEQNO;
490     seqNoArr[destPe] = val;
491 }
492
493 #define getNextExpectedMsgSeqNo(seqNoArr,srcPe) getNextMsgSeqNo(seqNoArr, srcPe)
494 #define setNextExpectedMsgSeqNo(seqNoArr,srcPe,val) setNextMsgSeqNo(seqNoArr, srcPe, val)
495
496 #endif
497 /* ===== End of functions regarding ensure in-order msg delivery ===== */
498
499 /* ===========CmiPushPe and CmiPushNode============*/
500 /* Add a message to this processor's receive queue, pe is a rank */
501 void CmiPushPE(int pe,void *msg) {
502     CmiState cs = CmiGetStateN(pe);
503     MACHSTATE3(3,"Pushing message(%p) into rank %d's queue %p {",msg,pe,(cs->recv));
504
505 #if CMK_IMMEDIATE_MSG
506     if (CmiIsImmediate(msg)) {
507         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
508         CMI_DEST_RANK(msg) = pe;
509         CmiPushImmediateMsg(msg);
510         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());
511         return;
512     }
513 #endif
514
515     /* Note atomicity is guaranteed inside pcqueue data structure --Chao Mei */
516     PCQueuePush(cs->recv,msg);
517
518     CmiIdleLock_addMessage(&cs->idle);
519     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
520 }
521
522 #if CMK_NODE_QUEUE_AVAILABLE
523 /*Add a message to this processor's receive queue */
524 /*Note: CmiPushNode is essentially same with CimSendNodeSelf */
525 static void CmiPushNode(void *msg) {    
526     MACHSTATE1(3,"[%p] Pushing message into NodeRecv queue",CmiGetState());
527
528 #if CMK_IMMEDIATE_MSG
529     if (CmiIsImmediate(msg)) {
530         MACHSTATE1(3, "[%p] Push Immediate Message begin {",CmiGetState());
531         CMI_DEST_RANK(msg) = 0;
532         CmiPushImmediateMsg(msg);
533         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());
534         return;
535     }
536 #endif
537     /* CmiNodeRecvLock may not be needed  --Chao Mei*/
538     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
539     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
540     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
541
542     CmiState cs=CmiGetStateN(0);
543     CmiIdleLock_addMessage(&cs->idle);
544
545     MACHSTATE(3,"Pushing message into NodeRecv queue {");
546 }
547 #endif
548
549 /* ======Beginning of helper functions for processing an incoming (network) message ======*/
550 /* Process a proc-level broadcast message */
551 static void ProcessProcBroadcastMsg(char *msg){
552     int nbytes = CMI_MSG_SIZE(msg);    
553 #if ENSURE_MSG_PAIRORDER
554     MACHSTATE3(2,"[%p] the broadcast msg is from pe=%d with seq no=%d", CmiGetState(), CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
555     #if CMK_BROADCAST_SPANNING_TREE    
556     SendSpanningChildren(nbytes, msg, CmiNodeFirst(CmiMyNode()), CpvAccessOther(bcastMsgSeqInfo, 0).nextMsgSeqNo);
557     #elif CMK_BROADCAST_HYPERCUBE    
558     SendHypercube(nbytes, msg, CmiNodeFirst(CmiMyNode()), CpvAccessOther(bcastMsgSeqInfo, 0).nextMsgSeqNo);
559     #endif
560 #else
561     #if CMK_BROADCAST_SPANNING_TREE
562     SendSpanningChildren(nbytes, msg);
563     #elif CMK_BROADCAST_HYPERCUBE
564     SendHypercube(nbytes, msg);
565     #endif
566 #endif                     
567 #if CMK_SMP
568     SendMsgToPeers(nbytes, msg, 1);                   
569     CmiFree(msg);
570 #else
571     /* nonsmp case */
572     CmiPushPE(0, msg);
573 #endif
574 }
575
576 #if CMK_NODE_QUEUE_AVAILABLE
577 /* Process a node-level broadcast message */
578 static void ProcessNodeBroadcastMsg(char *msg){
579     int nbytes = CMI_MSG_SIZE(msg);
580 #if CMK_BROADCAST_SPANNING_TREE
581     SendSpanningChildrenNode(nbytes, msg);
582 #elif CMK_BROADCAST_HYPERCUBE
583     SendHypercubeNode(nbytes, msg);
584 #endif            
585     CmiPushNode(msg);
586 }
587 #endif
588
589 /* Pull msgs from two queues, this function should not be called from cmpl_hdlr thread */
590 static void ProcessBroadcastMsg(int pullRank){
591     char *msg;
592     do{
593         msg = PCQueuePop(CpvAccessOther(procBcastQ, pullRank));
594         if(msg) {
595             MACHSTATE2(4, "[%p]: process a proc-level bcast msg %p begin{", CmiGetState(), msg);
596             ProcessProcBroadcastMsg(msg);
597             MACHSTATE2(4, "[%p]: process a proc-level bcast msg %p end}", CmiGetState(), msg);
598         }else{ 
599             break;
600         }
601     }while (1);
602 #if CMK_NODE_QUEUE_AVAILABLE
603     do{
604         msg = PCQueuePop(CsvAccess(nodeBcastQ));
605         if(msg) {
606             MACHSTATE2(4, "[%p]: process a node-level bcast msg %p begin{", CmiGetState(), msg);
607             ProcessNodeBroadcastMsg(msg);
608             MACHSTATE2(4, "[%p]: process a node-level bcast msg %p end}", CmiGetState(), msg);
609         }else{ 
610             break;
611         }
612     }while (1);    
613 #endif
614 }
615
616
617 #if ENSURE_MSG_PAIRORDER
618 /* return 1 if this msg is an out-of-order incoming message */
619
620 /**
621  * Returns 1 if this "msg" is an out-of-order message, or 
622  * this "msg" is a late message which triggers the process 
623  * of all buffered ooo msgs. 
624  * --Chao Mei 
625  */
626 static int CheckMsgInOrder(char *msg, MsgOrderInfo *info){
627     int srcpe, destrank; 
628     int incomingSeqNo, expectedSeqNo;
629     int curOffset, maxOffset;
630     int i, curWinSize;
631     void **destMsgBuffer = NULL;
632
633     /* numMsg is the number of msgs to be processed in this buffer*/
634     /* Reason to have this extra copy of msgs to be processed: Reduce the atomic granularity */
635     void **toProcessMsgBuffer;
636     int numMsgs = 0;    
637  
638     srcpe = CMI_MSG_SRCPE(msg);
639     destrank = CMI_DEST_RANK(msg);  
640     incomingSeqNo = CMI_MSG_SEQNO(msg);
641     
642     CmiLock(cmplHdlrThdLock);
643
644     expectedSeqNo = getNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe);
645     if(expectedSeqNo == incomingSeqNo){
646         /* Two cases: has ooo msg buffered or not */
647         maxOffset = (info->oooMaxOffset)[srcpe];
648         if(maxOffset>0) {
649             MACHSTATE1(4, "Processing all buffered ooo msgs (maxOffset=%d) including the just recved begin {", maxOffset);
650             curWinSize = info->CUR_WINDOW_SIZE[srcpe];
651             toProcessMsgBuffer = malloc((curWinSize+1)*sizeof(void *));
652             /* process the msg just recved */
653             toProcessMsgBuffer[numMsgs++] = msg;            
654             /* process the buffered ooo msg until the first empty slot in the window */
655             destMsgBuffer = (info->oooMsgBuffer)[srcpe];                       
656             for(curOffset=0; curOffset<maxOffset; curOffset++) {
657                 char *curMsg = destMsgBuffer[curOffset];
658                 if(curMsg == NULL){
659                     CmiAssert(curOffset!=(maxOffset-1));                                        
660                     break;
661                 }
662                 toProcessMsgBuffer[numMsgs++] = curMsg;
663                 destMsgBuffer[curOffset] = NULL;
664             }            
665             /* Update expected seqno, maxOffset and slide the window */
666             if(curOffset < maxOffset) {
667                 int i;
668                 /** 
669                  * now, the seqno of the next to-be-recved msg should be 
670                  * "expectedSeqNo+curOffset+1" as the seqno of the just 
671                  * processed msg is "expectedSeqNo+curOffset. We need to slide 
672                  * the msg buffer window from "curOffset+1" because the first 
673                  * element of the buffer window should always points to the ooo 
674                  * msg that's 1 in terms of seqno ahead of the next to-be-recved 
675                  * msg. --Chao Mei 
676                  */                
677                 
678                 /* moving [curOffset+1, maxOffset) to [0, maxOffset-curOffset-1) in the window */
679                 /* The following two loops could be combined --Chao Mei */
680                 for(i=0; i<maxOffset-curOffset-1; i++){
681                     destMsgBuffer[i] = destMsgBuffer[curOffset+i+1];
682                 }
683                 for(i=maxOffset-curOffset-1; i<maxOffset; i++) {
684                     destMsgBuffer[i] = NULL;
685                 }
686                 (info->oooMaxOffset)[srcpe] = maxOffset-curOffset-1;
687                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+curOffset);
688             }else{
689                 /* there's no remaining buffered ooo msgs */
690                 (info->oooMaxOffset)[srcpe] = 0;
691                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+maxOffset);
692             }
693
694             CmiUnlock(cmplHdlrThdLock);
695                         
696             /* Process the msgs */
697             for(i=0; i<numMsgs; i++){
698                 char *curMsg = toProcessMsgBuffer[i];                
699                 if(CMI_BROADCAST_ROOT(curMsg)>0) {
700                                         
701                 #if DECOUPLE_BCAST_PROCESS
702                     PCQueuePush(CpvAccessOther(procBcastQ, 0), curMsg);
703                 #else
704                     ProcessProcBroadcastMsg(curMsg);
705                 #endif
706
707                 }else{
708                     CmiPushPE(CMI_DEST_RANK(curMsg), curMsg);
709                 }
710             }
711
712             free(toProcessMsgBuffer);
713
714             MACHSTATE1(4, "Processing all buffered ooo msgs (actually processed %d) end }", curOffset);
715             /** 
716              * Since we have processed all buffered ooo msgs including 
717              * this just recved one, 1 should be returned so that this 
718              * msg no longer needs processing 
719              */
720             return 1;
721         }else{
722             /* An expected msg recved without any ooo msg buffered */
723             MACHSTATE1(4, "Receiving an expected msg with seqno=%d\n", incomingSeqNo);
724             setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo);
725
726             CmiUnlock(cmplHdlrThdLock);
727             return 0;
728         }        
729     }
730
731     MACHSTATE2(4, "Receiving an out-of-order msg with seqno=%d, but expect seqno=%d", incomingSeqNo, expectedSeqNo);
732     curWinSize = info->CUR_WINDOW_SIZE[srcpe];
733     if((info->oooMsgBuffer)[srcpe]==NULL) {        
734         (info->oooMsgBuffer)[srcpe] = malloc(curWinSize*sizeof(void *));
735         memset((info->oooMsgBuffer)[srcpe], 0, curWinSize*sizeof(void *));
736     }
737     destMsgBuffer = (info->oooMsgBuffer)[srcpe];
738     curOffset = incomingSeqNo - expectedSeqNo;
739     maxOffset = (info->oooMaxOffset)[srcpe];
740     if(curOffset<0) {
741         /* It's possible that the seqNo starts with another round (exceeding MAX_MSG_SEQNO) with 1 */
742         curOffset += MAX_MSG_SEQNO;
743     }
744     if(curOffset > curWinSize) {
745         int newWinSize;
746         if(curOffset > MAX_WINDOW_SIZE) {
747             CmiAbort("Exceeding the MAX_WINDOW_SIZE!\n");
748         }
749         newWinSize = ((curOffset/curWinSize)+1)*curWinSize;
750         /*CmiPrintf("[%d]: WARNING: INCREASING WINDOW SIZE FROM %d TO %d\n", CmiMyPe(), curWinSize, newWinSize);*/
751         (info->oooMsgBuffer)[srcpe] = malloc(newWinSize*sizeof(void *));
752         memset((info->oooMsgBuffer)[srcpe], 0, newWinSize*sizeof(void *));
753         memcpy((info->oooMsgBuffer)[srcpe], destMsgBuffer, curWinSize*sizeof(void *));
754         info->CUR_WINDOW_SIZE[srcpe] = newWinSize;
755         free(destMsgBuffer);
756         destMsgBuffer = (info->oooMsgBuffer)[srcpe];
757     }    
758     CmiAssert(destMsgBuffer[curOffset-1] == NULL);
759     destMsgBuffer[curOffset-1] = msg;
760     if(curOffset > maxOffset) (info->oooMaxOffset)[srcpe] = curOffset;
761
762     CmiUnlock(cmplHdlrThdLock);
763     return 1;
764 }
765
766 #endif
767 /* ======End of helper functions for processing an incoming (network) message ======*/
768
769 /* ======Begining of lapi callbacks such as the completion handler on the sender and recver side ======*/
770 /** 
771   * lapi completion handler on the recv side. It's responsible to push messages 
772   * to the destination proc or relay broadcast messages. --Chao Mei 
773   *  
774   * Note: The completion handler could be executed on any cores within a node ??? 
775   * So in SMP mode when there's a comm thread, the completion handler should be carefully 
776   * dealt with. 
777   *  
778   * Given lapi also provides an internal lapi thread to deal with network progress which 
779   * will call this function (???), we should be careful with the following situations: 
780   * 1) non SMP mode, with interrupt (lapi internal completion thread) 
781   * 2) non SMP mode, with polling (machine layer is responsible for network progress) 
782   * 3) SMP mode, no comm thread, with polling 
783   * 4) SMP mode, no comm thread, with interrupt 
784   * 5) SMP mode, with comm thread, with polling (not yet implemented, comm server is empty right now)
785   * 6) SMP mode, with comm thread, with interrupt?? 
786   *  
787   * Currently, SMP mode without comm thread is undergoing implementation. 
788   *  
789   * This function is executed by LAPI internal threads. It seems that the number of internal 
790   * completion handler threads could vary during the program. LAPI adaptively creates more 
791   * threads if there are more outstanding messages!!!! This means pcqueue needs protection 
792   * even in the nonsmp case!!!!
793   *  
794   * --Chao Mei 
795   */
796 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info) {
797     int i;
798     char *msg = am_info;
799     int broot, destrank;
800
801     MACHSTATE3(2,"[%p] PumpMsgsComplete with msg %p (isImm=%d) begin {",CmiGetState(), msg, CmiIsImmediate(msg));    
802 #if ENSURE_MSG_PAIRORDER
803     MACHSTATE3(2,"msg %p info: srcpe=%d, seqno=%d", msg, CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
804 #endif
805     /**
806      * First, we check if the msg is a broadcast msg via spanning 
807      * tree. If it is, it needs to call SendSpanningChildren to 
808      * relay the broadcast, and then send the msg to every cores on 
809      * this node.
810      *  
811      * After the first check, we deal with normal messages. 
812      * --Chao Mei
813      */
814 /* It's the right place to relay the broadcast message */
815     /**
816      * 1. For in-order delivery, because this is the handler for 
817      * receiving a message, and we assume the cross-network msgs are 
818      * always delivered to the first proc (rank 0) of this node, we 
819      * select the srcpe of the bcast msgs and the next msg seq no 
820      * correspondingly. 
821      *  
822      * 2. TODO: checking the in-order delivery of p2p msgs!! 
823      *  
824      * --Chao Mei 
825      */
826 #if ENSURE_MSG_PAIRORDER 
827     broot = CMI_BROADCAST_ROOT(msg);
828     destrank = CMI_DEST_RANK(msg);
829     /* Only check proc-level msgs */
830     if (broot>=0
831 #if CMK_NODE_QUEUE_AVAILABLE
832         && destrank != DGRAM_NODEMESSAGE
833 #endif
834     )
835     {
836         MsgOrderInfo *info;        
837         if(broot>0){
838             info = &CpvAccessOther(bcastMsgSeqInfo, destrank);
839             MACHSTATE1(2, "Check msg in-order for bcast msg %p", msg);
840         } else {
841             info = &CpvAccessOther(p2pMsgSeqInfo, destrank);
842             MACHSTATE1(2, "Check msg in-order for p2p msg %p", msg);
843         }
844         if(CheckMsgInOrder(msg,info)) {
845             MACHSTATE(2,"} PumpMsgsComplete end ");
846             return;
847         }
848     }
849 #endif    
850
851 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
852     if (CMI_BROADCAST_ROOT(msg)>0) {
853         MACHSTATE2(2,"[%p] Recved a proc-level broadcast msg %p",CmiGetState(), msg);     
854                 
855     #if DECOUPLE_BCAST_PROCESS
856         PCQueuePush(CpvAccessOther(procBcastQ, 0), msg);
857     #else
858         ProcessProcBroadcastMsg(msg);
859     #endif
860
861         MACHSTATE(2,"} PumpMsgsComplete end ");
862         return;
863     }
864
865 #if CMK_NODE_QUEUE_AVAILABLE
866     if(CMI_BROADCAST_ROOT(msg) < 0) {
867         MACHSTATE1(2,"[%p] Recved a node-level broadcast msg",CmiGetState());  
868         
869     #if DECOUPLE_BCAST_PROCESS
870         PCQueuePush(CsvAccess(nodeBcastQ), msg);
871     #else
872         ProcessNodeBroadcastMsg(msg);
873     #endif
874
875         MACHSTATE(2,"} PumpMsgsComplete end ");
876         return;
877     }
878 #endif
879
880 #endif
881
882 #if CMK_NODE_QUEUE_AVAILABLE
883     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
884         CmiPushNode(msg);
885     else{
886         MACHSTATE3(2,"[%p] Recv a p2p msg from pe=%d with seq no=%d", CmiGetState(), CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
887         CmiPushPE(CMI_DEST_RANK(msg), msg);
888     }
889 #else
890     CmiPushPE(CMI_DEST_RANK(msg), msg);
891 #endif
892
893     MACHSTATE(2,"} PumpMsgsComplete end ");
894     return;
895 }
896
897 /** lapi header handler: executed on the recv side, when the
898  *  first packet of the recving msg arrives, it is called to
899  *  prepare the memory buffer in the user space for recving the
900  *  data --Chao Mei
901  */
902 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
903                            void *hdr, uint *uhdr_len,
904                            lapi_return_info_t *msg_info,
905                            compl_hndlr_t **comp_h, void **comp_am_info) {
906     void *msg_buf;
907     MACHSTATE1(2,"[%p] PumpMsgsBegin begin {",CmiGetState());
908     /* prepare the space for receiving the data, set the completion handler to
909        be executed inline */
910     msg_buf = (void *)CmiAlloc(msg_info->msg_len);
911
912     msg_info->ret_flags = LAPI_SEND_REPLY;
913     *comp_h = PumpMsgsComplete;
914     *comp_am_info = msg_buf;
915     MACHSTATE(2,"} PumpMsgsBegin end");
916     return msg_buf;
917
918 }
919
920 /* The following two are lapi sender handlers */
921 static void ReleaseMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
922     MACHSTATE2(2,"[%p] ReleaseMsg begin %p {",CmiGetState(),msg);
923     check_lapi_err(info->reason, "ReleaseMsg", __LINE__);
924     CmiFree(msg);
925     MACHSTATE(2,"} ReleaseMsg end");
926 }
927
928 static void DeliveredMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
929     MACHSTATE1(2,"[%p] DeliveredMsg begin {",CmiGetState());
930     check_lapi_err(info->reason, "DeliveredMsg", __LINE__);
931     *((int *)msg) = *((int *)msg) - 1;
932     MACHSTATE(2,"} DeliveredMsg end");
933 }
934 /* ======End of lapi callbacks such as the completion handler on the sender and recver side ======*/
935
936
937 #if CMK_NODE_QUEUE_AVAILABLE
938 char *CmiGetNonLocalNodeQ(void) {
939     CmiState cs = CmiGetState();
940     char *result = 0;
941     CmiIdleLock_checkMessage(&cs->idle);
942     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
943         MACHSTATE2(3,"[%p] CmiGetNonLocalNodeQ begin %d {",CmiGetState(),CmiMyPe());
944         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
945         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
946         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
947         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
948     }
949     return result;
950 }
951 #endif
952
953 void *CmiGetNonLocal(void) {    
954     CmiState cs = CmiGetState();
955     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
956     CmiIdleLock_checkMessage(&cs->idle); 
957        
958 #if DECOUPLE_BCAST_PROCESS
959 #if !CMK_SMP
960     ProcessBroadcastMsg(0);
961 #elif CMK_SMP_NO_COMMTHD
962     if(CmiMyRank()==0) ProcessBroadcastMsg(0);
963 #endif
964 #endif
965            
966     void *msg = PCQueuePop(cs->recv);
967     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);    
968     return msg;
969
970 }
971
972 /**
973  * TODO: What will be the effects if calling LAPI_Probe in the 
974  * interrupt mode??? --Chao Mei 
975  */
976
977 /* user call to handle immediate message, since there is no ServerThread polling
978    messages (lapi does all the polling) every thread is authorized to process
979    immediate messages. If we are not in lapiInterruptMode check for progress.
980 */
981 void CmiMachineProgressImpl(){
982     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
983
984 #if CMK_IMMEDIATE_MSG
985     MACHSTATE1(2, "[%p] Handling Immediate Message begin {",CmiGetState());
986     CmiHandleImmediate();
987     MACHSTATE1(2, "[%p] Handling Immediate Message end }",CmiGetState());
988 #endif
989
990 #if CMK_SMP && !CMK_SMP_NO_COMMTHD && DECOUPLE_BCAST_PROCESS
991     if(CmiMyRank()==CmiMyNodeSize()) ProcessBroadcastMsg(0);
992 #endif
993 }
994
995 /*TODO: does lapi provide any Barrrier related functions as DCMF provides??? --Chao Mei */
996 /* Barrier needs to be implemented!!! -Chao Mei */
997 /* These two barriers are only needed by CmiTimerInit to synchronize all the
998    threads. They do not need to provide a general barrier. */
999 int CmiBarrier() {
1000     return 0;
1001 }
1002 int CmiBarrierZero() {
1003     return 0;
1004 }
1005
1006 /********************* MESSAGE SEND FUNCTIONS ******************/
1007
1008 /** 
1009  * "deliverable": used to know if the message can be encoded 
1010  * into the destPE queue withtout duplication (for usage of 
1011  * SMP). If it has already been duplicated (and therefore is 
1012  * deliverable), we do not want to copy it again, while if it 
1013  * has been copied we must do it before enqueuing it. 
1014  *  
1015  * The general send function for all Cmi send functions.
1016  */
1017 void lapiSendFn(int destPE, int size, char *msg, scompl_hndlr_t *shdlr, void *sinfo, int deliverable) {
1018     /* CmiState cs = CmiGetState(); */
1019     CmiUInt2  rank, node;
1020     lapi_xfer_t xfer_cmd;
1021
1022     MACHSTATE3(2,"lapiSendFn to destPE=%d with msg %p (isImm=%d) begin {",destPE,msg, CmiIsImmediate(msg));
1023     MACHSTATE3(2, "inside lapiSendFn 1: size=%d, sinfo=%p, deliverable=%d", size, sinfo, deliverable);
1024     
1025 #if ENSURE_MSG_PAIRORDER
1026     MACHSTATE3(2, "inside lapiSendFn 2: msg src->dest (%d->%d), seqno=%d", CMI_MSG_SRCPE(msg), destPE, CMI_MSG_SEQNO(msg));
1027 #endif
1028
1029     node = CmiNodeOf(destPE);
1030     /** 
1031      *  The rank of the msg should be set before calling
1032      *  lapiSendFn!!
1033      *  The rank could be DGRAM_NODEMESSAGE which indicates
1034      *  a node-level message.
1035      */
1036 #if CMK_SMP
1037     /*CMI_DEST_RANK(msg) = rank;*/
1038     if (node == CmiMyNode())  {
1039         rank = CmiRankOf(destPE);
1040         MACHSTATE2(2,"[%p] inside lapiSendFn for intra-node message (%p)",CmiGetState(), msg);
1041         if (deliverable) {
1042             CmiPushPE(rank, msg);
1043             /* the acknowledge of delivery must not be called */
1044         } else {
1045             CmiPushPE(rank, CopyMsg(msg, size));
1046             /* acknowledge that the message has been delivered */
1047             lapi_sh_info_t lapiInfo;
1048             lapiInfo.src = node;
1049             lapiInfo.reason = LAPI_SUCCESS;
1050             (*shdlr)(&lapiContext, sinfo, &lapiInfo);
1051         }
1052         return;
1053     }
1054 #endif
1055     
1056     MACHSTATE2(2, "Ready to call LAPI_Xfer with destPe=%d, destRank=%d",destPE,CMI_DEST_RANK(msg));
1057
1058     xfer_cmd.Am.Xfer_type = LAPI_AM_XFER;
1059     xfer_cmd.Am.flags     = 0;
1060     xfer_cmd.Am.tgt       = node;
1061     xfer_cmd.Am.hdr_hdl   = lapiHeaderHandler;
1062     xfer_cmd.Am.uhdr_len  = 0;
1063     xfer_cmd.Am.uhdr      = NULL;
1064     xfer_cmd.Am.udata     = msg;
1065     xfer_cmd.Am.udata_len = size;
1066     xfer_cmd.Am.shdlr     = shdlr;
1067     xfer_cmd.Am.sinfo     = sinfo;
1068     xfer_cmd.Am.tgt_cntr  = NULL;
1069     xfer_cmd.Am.org_cntr  = NULL;
1070     xfer_cmd.Am.cmpl_cntr = NULL;
1071
1072     check_lapi(LAPI_Xfer,(lapiContext, &xfer_cmd));
1073
1074     MACHSTATE(2,"} lapiSendFn end");
1075 }
1076
1077 static void CmiSendSelf(char *msg) {
1078     MACHSTATE1(3,"[%p] Sending itself a message {",CmiGetState());
1079
1080 #if CMK_IMMEDIATE_MSG
1081     if (CmiIsImmediate(msg)) {
1082         MACHSTATE1(3, "[%p] Push Immediate Message begin {",CmiGetState());        
1083         CmiPushImmediateMsg(msg);
1084         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());        
1085         return;
1086     }
1087 #endif
1088     CQdCreate(CpvAccess(cQdState), 1);
1089     CdsFifo_Enqueue(CmiGetState()->localqueue,msg);
1090
1091     MACHSTATE(3,"} Sending itself a message");
1092 }
1093
1094 void CmiSyncSendFn(int destPE, int size, char *msg) {
1095     CmiState cs = CmiGetState();
1096     char *dupmsg = CopyMsg(msg, size);
1097
1098     MACHSTATE1(3,"[%p] Sending sync message begin {",CmiGetState());
1099     CMI_BROADCAST_ROOT(dupmsg) = 0;
1100     CMI_DEST_RANK(dupmsg) = CmiRankOf(destPE);
1101
1102     if (cs->pe==destPE) {
1103         CmiSendSelf(dupmsg);
1104     } else {
1105     #if ENSURE_MSG_PAIRORDER
1106         CMI_MSG_SRCPE(dupmsg) = CmiMyPe();        
1107         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1108         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(dupmsg));
1109     #endif
1110
1111     #if ENABLE_CONVERSE_QD
1112         CQdCreate(CpvAccess(cQdState), 1);
1113     #endif
1114         lapiSendFn(destPE, size, dupmsg, ReleaseMsg, dupmsg, 1);
1115     }
1116     MACHSTATE(3,"} Sending sync message end");
1117 }
1118
1119 int CmiAsyncMsgSent(CmiCommHandle handle) {
1120     return (*((int *)handle) == 0)?1:0;
1121 }
1122
1123 void CmiReleaseCommHandle(CmiCommHandle handle) {
1124 #ifndef CMK_OPTIMIZE
1125     if (*((int *)handle) != 0) CmiAbort("Released a CmiCommHandle not free!");
1126 #endif
1127     free(handle);
1128 }
1129
1130 /* the CmiCommHandle returned is a pointer to the location of an int. When it is
1131    set to 1 the message is available. */
1132 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
1133     MACHSTATE1(3,"[%p] Sending async message begin {",CmiGetState());
1134     void *handle;
1135     CmiState cs = CmiGetState();
1136     CMI_BROADCAST_ROOT(msg) = 0;
1137     CMI_DEST_RANK(msg) = CmiRankOf(destPE);
1138
1139     /* if we are the destination, send ourself a copy of the message */
1140     if (cs->pe==destPE) {
1141         CmiSendSelf(CopyMsg(msg, size));
1142         MACHSTATE(3,"} Sending async message end");
1143         return 0;
1144     }
1145
1146     handle = malloc(sizeof(int));
1147     *((int *)handle) = 1;
1148
1149 #if ENSURE_MSG_PAIRORDER
1150     CMI_MSG_SRCPE(msg) = CmiMyPe();    
1151     CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1152     setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
1153 #endif
1154
1155 #if ENABLE_CONVERSE_QD
1156     CQdCreate(CpvAccess(cQdState), 1);
1157 #endif
1158     lapiSendFn(destPE, size, msg, DeliveredMsg, handle, 0);
1159     /* the message may have been duplicated and already delivered if we are in SMP
1160        mode and the destination is on the same node, but there is no optimized
1161        check for that. */
1162     MACHSTATE(3,"} Sending async message end");
1163     return handle;
1164 }
1165
1166 void CmiFreeSendFn(int destPE, int size, char *msg) {
1167     MACHSTATE1(3,"[%p] Sending sync free message begin {",CmiGetState());
1168     CmiState cs = CmiGetState();
1169     CMI_BROADCAST_ROOT(msg) = 0;
1170     CMI_DEST_RANK(msg) = CmiRankOf(destPE);
1171
1172     if (cs->pe==destPE) {
1173         CmiSendSelf(msg);
1174     } else {
1175     #if ENSURE_MSG_PAIRORDER
1176         CMI_MSG_SRCPE(msg) = CmiMyPe();        
1177         CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1178         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
1179     #endif
1180     #if ENABLE_CONVERSE_QD
1181         CQdCreate(CpvAccess(cQdState), 1);
1182     #endif        
1183         lapiSendFn(destPE, size, msg, ReleaseMsg, msg, 1);
1184         /*CmiAsyncSendFn(destPE, size, msg);*/
1185     }
1186     MACHSTATE(3,"} Sending sync free message end");
1187 }
1188
1189 /* ===========Node level p2p send functions============= */
1190 #if CMK_NODE_QUEUE_AVAILABLE
1191 static void CmiSendNodeSelf(char *msg) {
1192     CmiState cs;
1193     MACHSTATE1(3,"[%p] Sending itself a node message {",CmiGetState());
1194
1195 #if CMK_IMMEDIATE_MSG
1196     if (CmiIsImmediate(msg)) {
1197         MACHSTATE1(3, "[%p] Push Immediate Message {",CmiGetState());
1198         CMI_DEST_RANK(msg) = 0;
1199         CmiPushImmediateMsg(msg);
1200         MACHSTATE(3, "} Push Immediate Message end");
1201         return;
1202     }
1203 #endif
1204     CQdCreate(CpvAccess(cQdState), 1);
1205     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1206     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1207     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1208
1209     cs=CmiGetStateN(0);
1210     CmiIdleLock_addMessage(&cs->idle);
1211
1212     MACHSTATE(3,"} Sending itself a node message");
1213 }
1214
1215 /*TODO: not sure whether the in-order delivery affects for node messages?? --Chao Mei */
1216 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
1217     char *dupmsg = CopyMsg(msg, size);
1218
1219     MACHSTATE1(3,"[%p] Sending sync node message begin {",CmiGetState());
1220     CMI_BROADCAST_ROOT(dupmsg) = 0;
1221     CMI_DEST_RANK(dupmsg) = DGRAM_NODEMESSAGE;
1222
1223     if (CmiMyNode()==destNode) {
1224         CmiSendNodeSelf(dupmsg);
1225     } else {
1226     #if ENABLE_CONVERSE_QD
1227         CQdCreate(CpvAccess(cQdState), 1);
1228     #endif
1229         lapiSendFn(CmiNodeFirst(destNode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1230     }
1231     MACHSTATE(3,"} Sending sync node message end");
1232 }
1233
1234 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
1235     void *handle;
1236     CMI_BROADCAST_ROOT(msg) = 0;
1237     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1238
1239     MACHSTATE1(3,"[%p] Sending async node message begin {",CmiGetState());
1240     /* if we are the destination, send ourself a copy of the message */
1241     if (CmiMyNode()==destNode) {
1242         CmiSendNodeSelf(CopyMsg(msg, size));
1243         MACHSTATE(3,"} Sending async node message end");
1244         return 0;
1245     }
1246
1247     handle = malloc(sizeof(int));
1248     *((int *)handle) = 1;
1249
1250 #if ENABLE_CONVERSE_QD
1251     CQdCreate(CpvAccess(cQdState), 1);
1252 #endif
1253     lapiSendFn(CmiNodeFirst(destNode), size, msg, DeliveredMsg, handle, 0);
1254     /* the message may have been duplicated and already delivered if we are in SMP
1255        mode and the destination is on the same node, but there is no optimized
1256        check for that. */
1257     MACHSTATE(3,"} Sending async node message end");
1258     return handle;
1259
1260 }
1261
1262 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
1263     CMI_BROADCAST_ROOT(msg) = 0;
1264     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1265
1266     MACHSTATE1(3,"[%p] Sending sync free node message begin {",CmiGetState());
1267     if (CmiMyNode()==destNode) {
1268         CmiSendNodeSelf(msg);
1269     } else {
1270     #if ENABLE_CONVERSE_QD
1271         CQdCreate(CpvAccess(cQdState), 1);
1272     #endif
1273         lapiSendFn(CmiNodeFirst(destNode), size, msg, ReleaseMsg, msg, 1);
1274     }
1275     MACHSTATE(3,"} Sending sync free node message end");
1276 }
1277 #endif
1278
1279 /*********************** BROADCAST FUNCTIONS **********************/
1280 #if CMK_SMP
1281 /** 
1282   * Sending msgs to cores in the same node 
1283   * "includeSelf" indicates whether the msg should be sent to 
1284   * the proc of rank CMI_DEST_RANK(msg) 
1285   * --Chao mei 
1286   *  
1287   */
1288 void SendMsgToPeers(int size, char *msg, int includeSelf){
1289     if(includeSelf) {
1290         int i;
1291         for(i=0; i<CmiMyNodeSize(); i++) {
1292             char *dupmsg = CopyMsg(msg,size);
1293             CmiPushPE(i,dupmsg);
1294         }
1295     }else{
1296         int i;
1297         int excludeRank = CMI_DEST_RANK(msg);
1298         for(i=excludeRank+1; i<CmiMyNodeSize(); i++) {
1299             char *dupmsg = CopyMsg(msg,size);
1300             CmiPushPE(i,dupmsg);
1301         }
1302         for(i=0; i<excludeRank; i++) {
1303             char *dupmsg = CopyMsg(msg,size);
1304             CmiPushPE(i,dupmsg);
1305         }
1306     }
1307 }
1308 #endif
1309
1310 /** 
1311  * SendSpanningChildren only sends inter-node messages. The 
1312  * intra-node messages are delivered via SendMsgToPeers if it is 
1313  * in SMP mode. 
1314  */
1315 #if ENSURE_MSG_PAIRORDER
1316 void SendSpanningChildren(int size, char *msg, int srcPe, int *seqNoArr){
1317 #else
1318 void SendSpanningChildren(int size, char *msg) {
1319 #endif
1320     int startproc = CMI_BROADCAST_ROOT(msg)-1;
1321     int startnode = CmiNodeOf(startproc);
1322     int i, rp;
1323     char *dupmsg;
1324
1325     CmiAssert(startnode>=0 && startnode<CmiNumNodes());
1326     
1327     MACHSTATE3(3, "[%p] SendSpanningChildren on proc=%d with start proc %d begin {",CmiGetState(), CmiMyPe(), startproc);  
1328     /* rp is the relative node id from start node */  
1329     rp = CmiMyNode() - startnode;
1330     if(rp<0) rp += CmiNumNodes();
1331     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1332         int p = BROADCAST_SPANNING_FACTOR*rp + i;
1333         if (p > CmiNumNodes() - 1) break;
1334         p += startnode;
1335         p = p%CmiNumNodes();
1336         
1337 #if CMK_BROADCAST_USE_CMIREFERENCE
1338         CmiReference(msg);
1339         lapiSendFn(CmiNodeFirst(p), size, msg, ReleaseMsg, msg, 0);
1340 #else
1341         dupmsg = CopyMsg(msg, size);
1342     #if ENSURE_MSG_PAIRORDER
1343         CMI_MSG_SRCPE(dupmsg) = srcPe;
1344         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(seqNoArr, CmiNodeFirst(p));
1345         setNextMsgSeqNo(seqNoArr, CmiNodeFirst(p), CMI_MSG_SEQNO(dupmsg));
1346     #endif
1347         lapiSendFn(CmiNodeFirst(p), size, dupmsg, ReleaseMsg, dupmsg, 1);
1348 #endif
1349     }    
1350
1351     MACHSTATE3(3, "[%p] SendSpanningChildren on proc=%d with start proc %d end }",CmiGetState(), CmiMyPe(), startproc);    
1352 }
1353
1354 /* Return the smallest integer that is larger than or equal to log2(i), e.g CmiLog2(14) = 4*/
1355 static int CmiLog2 (int i) {
1356     int m;
1357     for (m=0; i>(1<<m); ++m);
1358     return m;
1359 }
1360
1361 /* send msg along the hypercube in broadcast. (Chao Mei) */
1362 #if ENSURE_MSG_PAIRORDER
1363 void SendHypercube(int size, char *msg, int srcPe, int *seqNoArr){
1364 #else
1365 void SendHypercube(int size, char *msg) {
1366 #endif
1367     int i, dist;   
1368     char *dupmsg;
1369     int dims = CmiLog2(CmiNumNodes());
1370     int startproc = CMI_BROADCAST_ROOT(msg)-1;
1371     int startnode = CmiNodeOf(startproc);
1372     /* relative proc id to startnode */
1373     int rp = CmiMyNode() - startnode;    
1374     if(rp < 0) rp += CmiNumNodes();
1375     dist = rp;
1376
1377     MACHSTATE3(3, "[%p] SendHypercube on proc=%d with start proc %d begin {",CmiGetState(), CmiMyPe(), startproc);    
1378     for(i=0; i<dims; i++) { 
1379         if((dist & 1) == 1) break;
1380
1381         /* destnode is still the relative node id from startnode */
1382         int destnode = rp + (1<<i);
1383         if(destnode > CmiNumNodes()-1) break; 
1384         
1385         destnode += startnode;
1386         destnode = destnode % CmiNumNodes();
1387
1388         CmiAssert(destnode != CmiMyNode());
1389                
1390 #if CMK_BROADCAST_USE_CMIREFERENCE
1391         CmiReference(msg);
1392         lapiSendFn(CmiNodeFirst(destnode), size, msg, ReleaseMsg, msg, 0);
1393 #else
1394         dupmsg = CopyMsg(msg, size);
1395     #if ENSURE_MSG_PAIRORDER
1396         CMI_MSG_SRCPE(dupmsg) = srcPe;
1397         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(seqNoArr, CmiNodeFirst(destnode));
1398         setNextMsgSeqNo(seqNoArr, CmiNodeFirst(destnode), CMI_MSG_SEQNO(dupmsg));
1399     #endif
1400         lapiSendFn(CmiNodeFirst(destnode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1401 #endif        
1402         dist = dist >> 1;    
1403     }    
1404     MACHSTATE3(3, "[%p] SendHypercube on proc=%d with start proc %d end }",CmiGetState(), CmiMyPe(), startproc);    
1405 }
1406
1407 void CmiSyncBroadcastGeneralFn(int size, char *msg) {    /* ALL_EXCEPT_ME  */
1408     int i, rank;
1409     MACHSTATE3(3,"[%p] Sending sync broadcast message %p with size %d begin {",CmiGetState(), msg, size);
1410
1411 #if ENABLE_CONVERSE_QD
1412     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1413 #endif
1414
1415 #if CMK_BROADCAST_SPANNING_TREE
1416     CMI_BROADCAST_ROOT(msg) = CmiMyPe()+1;
1417     /** 
1418      * since the broadcast msg will be relayed separately on 
1419      * remote procs by SendSpanningChildren, the actual msg size 
1420      * needs to be recorded in the header for proper memory 
1421      * allocation. E.g. the bcast msg is relayed in PumpMsgComplete.
1422      * But at that time, the actual msg size is not known if the 
1423      * filed inside the msg header is not set. The unknown actual 
1424      * msg size will cause the relay of msg fail because CopyMsg 
1425      * doesn't have a correct msg size input!! -Chao Mei 
1426      */
1427     CMI_MSG_SIZE(msg) = size;
1428     /* node-aware spanning tree, so bcast msg is always delivered to the first core on each node */
1429     CMI_DEST_RANK(msg) = 0;
1430 #if ENSURE_MSG_PAIRORDER    
1431     SendSpanningChildren(size, msg, CmiMyPe(), CpvAccess(bcastMsgSeqInfo).nextMsgSeqNo);
1432 #else
1433     SendSpanningChildren(size, msg);
1434 #endif
1435         
1436 #if CMK_SMP
1437     CMI_DEST_RANK(msg) = CmiMyRank();
1438     SendMsgToPeers(size, msg, 0);
1439 #endif
1440
1441 #elif CMK_BROADCAST_HYPERCUBE 
1442    
1443     CMI_BROADCAST_ROOT(msg) = CmiMyPe()+1;
1444     CMI_MSG_SIZE(msg) = size;
1445     CMI_DEST_RANK(msg) = 0;    
1446
1447 #if ENSURE_MSG_PAIRORDER    
1448     SendHypercube(size, msg, CmiMyPe(), CpvAccess(bcastMsgSeqInfo).nextMsgSeqNo);
1449 #else
1450     SendHypercube(size, msg);
1451 #endif
1452       
1453 #if CMK_SMP
1454     CMI_DEST_RANK(msg) = CmiMyRank();
1455     SendMsgToPeers(size, msg, 0);
1456 #endif
1457
1458 #else
1459     CmiState cs = CmiGetState();
1460     char *dupmsg;
1461
1462     CMI_BROADCAST_ROOT(msg) = 0;
1463 #if CMK_BROADCAST_USE_CMIREFERENCE
1464     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1465         CmiReference(msg);
1466         lapiSendFn(i, size, msg, ReleaseMsg, msg, 0);
1467         /*CmiSyncSendFn(i, size, msg) ;*/
1468     }
1469     for (i=0; i<cs->pe; i++) {
1470         CmiReference(msg);
1471         lapiSendFn(i, size, msg, ReleaseMsg, msg, 0);
1472         /*CmiSyncSendFn(i, size,msg) ;*/
1473     }
1474 #else
1475 #if ENSURE_MSG_PAIRORDER
1476     CMI_MSG_SRCPE(msg) = CmiMyPe();
1477 #endif
1478     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1479         dupmsg = CopyMsg(msg, size);
1480         CMI_DEST_RANK(dupmsg) = CmiRankOf(i);
1481     #if ENSURE_MSG_PAIRORDER        
1482         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i);
1483         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i, CMI_MSG_SEQNO(dupmsg));
1484     #endif
1485         lapiSendFn(i, size, dupmsg, ReleaseMsg, dupmsg, 1);        
1486     }
1487     for (i=0; i<cs->pe; i++) {
1488         dupmsg = CopyMsg(msg, size);
1489     #if ENSURE_MSG_PAIRORDER        
1490         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i);
1491         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i, CMI_MSG_SEQNO(dupmsg));
1492     #endif
1493         CMI_DEST_RANK(dupmsg) = CmiRankOf(i);
1494         lapiSendFn(i, size, dupmsg, ReleaseMsg, dupmsg, 1);        
1495     }
1496 #endif
1497 #endif
1498
1499     MACHSTATE(3,"} Sending sync broadcast message end");
1500 }
1501
1502 CmiCommHandle CmiAsyncBroadcastGeneralFn(int size, char *msg) {
1503 #if ENSURE_MSG_PAIRORDER
1504     /* Not sure how to add the msg seq no for async broadcast messages --Chao Mei */
1505     /* so abort here ! */
1506     CmiAssert(0);
1507     return 0;
1508 #else
1509     CmiState cs = CmiGetState();
1510     int i, rank;
1511 #if ENABLE_CONVERSE_QD
1512     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1513 #endif
1514     MACHSTATE1(3,"[%p] Sending async broadcast message from {",CmiGetState());
1515     CMI_BROADCAST_ROOT(msg) = 0;
1516     void *handle = malloc(sizeof(int));
1517     *((int *)handle) = CmiNumPes()-1;
1518
1519     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1520         CMI_DEST_RANK(msg) = CmiRankOf(i);
1521         lapiSendFn(i, size, msg, DeliveredMsg, handle, 0);
1522     }
1523     for (i=0; i<cs->pe; i++) {
1524         CMI_DEST_RANK(msg) = CmiRankOf(i);
1525         lapiSendFn(i, size, msg, DeliveredMsg, handle, 0);
1526     }
1527
1528     MACHSTATE(3,"} Sending async broadcast message end");
1529     return handle;
1530 #endif
1531 }
1532
1533 void CmiSyncBroadcastFn(int size, char *msg) {
1534     /*CMI_DEST_RANK(msg) = 0;*/
1535     CmiSyncBroadcastGeneralFn(size, msg);
1536 }
1537
1538 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1539     /*CMI_DEST_RANK(msg) = 0;*/
1540     return CmiAsyncBroadcastGeneralFn(size, msg);
1541 }
1542
1543 void CmiFreeBroadcastFn(int size, char *msg) {
1544     CmiSyncBroadcastFn(size,msg);
1545     CmiFree(msg);
1546 }
1547
1548 void CmiSyncBroadcastAllFn(int size, char *msg) {       /* All including me */
1549     CmiSendSelf(CopyMsg(msg, size));
1550     CmiSyncBroadcastFn(size, msg);
1551 }
1552
1553 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1554     CmiSendSelf(CopyMsg(msg, size));
1555     return CmiAsyncBroadcastFn(size, msg);
1556 }
1557
1558 void CmiFreeBroadcastAllFn(int size, char *msg) {       /* All including me */
1559     CmiSendSelf(CopyMsg(msg, size));
1560     CmiSyncBroadcastFn(size, msg);
1561     CmiFree(msg);
1562 }
1563
1564 #if CMK_NODE_QUEUE_AVAILABLE
1565 void SendSpanningChildrenNode(int size, char *msg){    
1566     int startnode = -CMI_BROADCAST_ROOT(msg)-1;    
1567     int i, rp;
1568     char *dupmsg;
1569
1570     MACHSTATE3(2, "[%p] SendSpanningChildrenNode on node %d with startnode %d", CmiGetState(), CmiMyNode(), startnode);
1571     CmiAssert(startnode>=0 && startnode<CmiNumNodes());
1572         
1573     rp = CmiMyNode() - startnode;
1574     if (rp<0) rp+=CmiNumNodes();        
1575     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1576         int p = BROADCAST_SPANNING_FACTOR*rp + i;
1577         if (p > CmiNumNodes() - 1) break;
1578         p += startnode;
1579         p = p%CmiNumNodes();
1580         
1581 #if CMK_BROADCAST_USE_CMIREFERENCE
1582         CmiReference(msg);
1583         lapiSendFn(CmiNodeFirst(p), size, msg, ReleaseMsg, msg, 0);
1584 #else
1585         dupmsg = CopyMsg(msg, size);
1586         lapiSendFn(CmiNodeFirst(p), size, dupmsg, ReleaseMsg, dupmsg, 1);
1587 #endif
1588     }
1589     MACHSTATE3(3, "[%p] SendSpanningChildrenNode on node=%d with start node %d end }",CmiGetState(), CmiMyNode(), startnode);
1590 }
1591
1592 /* send msg along the hypercube in broadcast. (Chao Mei) */
1593 void SendHypercubeNode(int size, char *msg) {
1594     int i, dist;   
1595     char *dupmsg;
1596     int dims = CmiLog2(CmiNumNodes());
1597     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1598     int rp = CmiMyNode() - startnode;    
1599     if(rp < 0) rp += CmiNumNodes();
1600     dist = rp;
1601
1602     MACHSTATE3(3, "[%p] SendHypercubeNode on node=%d with start node %d begin {",CmiGetState(), CmiMyNode(), startnode);
1603     for(i=0; i<dims; i++) { 
1604         if((dist & 1) == 1) break;
1605         
1606         int destnode = rp + (1<<i);
1607         if(destnode > CmiNumNodes()-1) break;
1608         
1609         destnode += startnode;
1610         destnode = destnode % CmiNumNodes();        
1611                         
1612         CmiAssert(destnode != CmiMyNode());
1613
1614 #if CMK_BROADCAST_USE_CMIREFERENCE
1615         CmiReference(msg);
1616         lapiSendFn(CmiNodeFirst(destnode), size, msg, ReleaseMsg, msg, 0);
1617 #else
1618         dupmsg = CopyMsg(msg, size);
1619         lapiSendFn(CmiNodeFirst(destnode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1620 #endif        
1621         dist = dist >> 1;    
1622     }
1623     MACHSTATE3(3, "[%p] SendHypercubeNode on node=%d with start node %d end }",CmiGetState(), CmiMyNode(), startnode);  
1624 }
1625
1626 void CmiSyncNodeBroadcastGeneralFn(int size, char *msg) {    /* ALL_EXCEPT_THIS_NODE  */     
1627 #if ENABLE_CONVERSE_QD
1628     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1629 #endif
1630 #if CMK_BROADCAST_SPANNING_TREE
1631
1632     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use spanning tree) begin {",CmiGetState());
1633     CMI_BROADCAST_ROOT(msg) = -(CmiMyNode()+1);
1634     CMI_MSG_SIZE(msg) = size;
1635     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;    
1636     SendSpanningChildrenNode(size, msg);
1637     
1638 #elif CMK_BROADCAST_HYPERCUBE
1639
1640     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use Hypercube) begin {",CmiGetState());
1641     CMI_BROADCAST_ROOT(msg) = -(CmiMyNode()+1);
1642     CMI_MSG_SIZE(msg) = size;
1643     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;    
1644     SendHypercubeNode(size, msg);
1645     
1646 #else      
1647     char *dupmsg;
1648     int i;
1649     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use p2p) begin {",CmiGetState());
1650
1651     CMI_BROADCAST_ROOT(msg) = 0;
1652     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1653     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {
1654         dupmsg = CopyMsg(msg, size);        
1655         lapiSendFn(CmiNodeFirst(i), size, dupmsg, ReleaseMsg, dupmsg, 1);        
1656     }
1657     for (i=0; i<CmiMyNode(); i++) {
1658         dupmsg = CopyMsg(msg, size);        
1659         lapiSendFn(CmiNodeFirst(i), size, dupmsg, ReleaseMsg, dupmsg, 1);        
1660     }
1661 #endif
1662     MACHSTATE(3,"} Sending sync node broadcast message end");
1663 }
1664
1665 CmiCommHandle CmiAsyncNodeBroadcastGeneralFn(int size, char *msg) {
1666     int i;
1667
1668 #if ENABLE_CONVERSE_QD
1669     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1670 #endif
1671
1672     MACHSTATE1(3,"[%p] Sending async node broadcast message from {",CmiGetState());
1673     CMI_BROADCAST_ROOT(msg) = 0;
1674     CMI_DEST_RANK(msg) =DGRAM_NODEMESSAGE;
1675     void *handle = malloc(sizeof(int));
1676     *((int *)handle) = CmiNumNodes()-1;
1677     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {        
1678         lapiSendFn(CmiNodeFirst(i), size, msg, DeliveredMsg, handle, 0);
1679     }
1680     for (i=0; i<CmiMyNode(); i++) {        
1681         lapiSendFn(CmiNodeFirst(i), size, msg, DeliveredMsg, handle, 0);
1682     }
1683
1684     MACHSTATE(3,"} Sending async broadcast message end");
1685     return handle;
1686 }
1687
1688 void CmiSyncNodeBroadcastFn(int size, char *msg) {
1689     /*CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;*/
1690     CmiSyncNodeBroadcastGeneralFn(size, msg);
1691 }
1692
1693 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
1694     /*CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;*/
1695     return CmiAsyncNodeBroadcastGeneralFn(size, msg);
1696 }
1697
1698 void CmiFreeNodeBroadcastFn(int size, char *msg) {
1699     CmiSyncNodeBroadcastFn(size, msg);
1700     CmiFree(msg);
1701 }
1702
1703 void CmiSyncNodeBroadcastAllFn(int size, char *msg) {
1704     CmiSendNodeSelf(CopyMsg(msg, size));
1705     CmiSyncNodeBroadcastFn(size, msg);
1706 }
1707
1708 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg) {
1709     CmiSendNodeSelf(CopyMsg(msg, size));
1710     return CmiAsyncNodeBroadcastFn(size, msg);
1711 }
1712
1713 void CmiFreeNodeBroadcastAllFn(int size, char *msg) {
1714     CmiSendNodeSelf(CopyMsg(msg, size));
1715     CmiSyncNodeBroadcastFn(size, msg);
1716     CmiFree(msg);
1717 }
1718 #endif
1719
1720 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1721 void CmiSyncListSendFn(int, int *, int, char*) {
1722
1723 }
1724
1725 CmiCommHandle CmiAsyncListSendFn(int, int *, int, char*) {
1726
1727 }
1728
1729 void CmiFreeListSendFn(int, int *, int, char*) {
1730
1731 }
1732 #endif
1733
1734 #if ! CMK_VECTOR_SEND_USES_COMMON_CODE
1735 void CmiSyncVectorSend(int, int, int *, char **) {
1736
1737 }
1738
1739 CmiCommHandle CmiAsyncVectorSend(int, int, int *, char **) {
1740
1741 }
1742
1743 void CmiSyncVectorSendAndFree(int, int, int *, char **) {
1744
1745 }
1746 #endif
1747
1748
1749 /************************** MAIN (non comm related functions) ***********************************/
1750
1751 void ConverseExit(void) {
1752     MACHSTATE2(2, "[%d-%p] entering ConverseExit begin {",CmiMyPe(),CmiGetState());
1753     
1754     /* TODO: Is it necessary to drive the network progress here?? -Chao Mei */
1755
1756     /* A barrier excluding the comm thread if present */
1757     CmiNodeBarrier();
1758
1759 #if CMK_SMP && !CMK_SMP_NO_COMMTHD
1760     /* Signal the comm thd to exit now! */
1761     if(CmiMyRank()==0) {
1762         commThdExit = 1;
1763     }
1764 #endif
1765
1766     ConverseCommonExit();
1767
1768 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1769     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
1770 #endif
1771
1772     CmiNodeBarrier();
1773
1774     MACHSTATE2(2, "[%d-%p] ConverseExit end }",CmiMyPe(),CmiGetState());
1775 #if CMK_SMP
1776     if(CmiMyRank()==0) {
1777         check_lapi(LAPI_Gfence, (lapiContext));
1778         check_lapi(LAPI_Term, (lapiContext));
1779         exit(EXIT_SUCCESS);
1780     }else{
1781         pthread_exit(NULL);
1782     }
1783 #else
1784     check_lapi(LAPI_Gfence, (lapiContext));      
1785     check_lapi(LAPI_Term, (lapiContext));
1786     exit(EXIT_SUCCESS);
1787 #endif
1788 }
1789
1790 /* Those be Cpved?? --Chao Mei */
1791 static char     **Cmi_argv;
1792 static CmiStartFn Cmi_startfn;   /* The start function */
1793 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1794
1795 /** 
1796  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
1797  *  When the proc is idle, the LAPI_Probe is called to make
1798  *  network progress.
1799  *  
1800  *  While in SMP mode, CmiNotifyStillIdle and CmiNotifyBeginIdle
1801  *  are used. Particularly, when idle, the frequency of calling
1802  *  lapi_probe (network progress) is given by "sleepMs"
1803  */
1804 void CmiNotifyIdle(void) {    
1805     AdvanceCommunication();
1806     CmiYield();
1807 }
1808
1809 typedef struct {
1810     int sleepMs; /*Milliseconds to sleep while idle*/
1811     int nIdles; /*Number of times we've been idle in a row*/
1812     CmiState cs; /*Machine state*/
1813 } CmiIdleState;
1814
1815 static CmiIdleState *CmiNotifyGetState(void) {
1816     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1817     s->sleepMs=0;
1818     s->nIdles=0;
1819     s->cs=CmiGetState();
1820     return s;
1821 }
1822
1823 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1824     s->sleepMs=0;
1825     s->nIdles=0;
1826 }
1827
1828 #define SPINS_BEFORE_SLEEP     20
1829
1830 static void CmiNotifyStillIdle(CmiIdleState *s) {
1831     MACHSTATE2(2,"[%p] still idle (%d) begin {",CmiGetState(),CmiMyPe());
1832     s->nIdles++;
1833     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1834         s->sleepMs+=2;
1835         if (s->sleepMs>10) s->sleepMs=10;
1836     }
1837     if (s->sleepMs>0) {
1838         MACHSTATE1(2,"idle sleep (%d) {",CmiMyPe());
1839         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1840         MACHSTATE1(2,"} idle sleep (%d)",CmiMyPe());
1841     }
1842
1843     AdvanceCommunication();
1844     
1845     MACHSTATE1(2,"still idle (%d) end }",CmiMyPe());
1846 }
1847
1848 #if MACHINE_DEBUG_LOG
1849 CpvDeclare(FILE *, debugLog);
1850 #endif
1851
1852
1853 #if ENSURE_MSG_PAIRORDER
1854 static void initMsgOrderInfo(MsgOrderInfo *info){
1855     int i;
1856     info->nextMsgSeqNo = malloc(CmiNumPes()*sizeof(int));
1857     memset(info->nextMsgSeqNo, 0, CmiNumPes()*sizeof(int));
1858     
1859     info->expectedMsgSeqNo = malloc(CmiNumPes()*sizeof(int));
1860     memset(info->expectedMsgSeqNo, 0, CmiNumPes()*sizeof(int));
1861     
1862     info->oooMsgBuffer = malloc(CmiNumPes()*sizeof(void **));
1863     memset(info->oooMsgBuffer, 0, CmiNumPes()*sizeof(void **));
1864     
1865     info->oooMaxOffset = malloc(CmiNumPes()*sizeof(unsigned char));
1866     memset(info->oooMaxOffset, 0, CmiNumPes()*sizeof(unsigned char));
1867
1868     info->CUR_WINDOW_SIZE = malloc(CmiNumPes()*sizeof(unsigned char));
1869     for(i=0; i<CmiNumPes(); i++) info->CUR_WINDOW_SIZE[i] = INIT_WINDOW_SIZE;
1870 }
1871 #endif
1872
1873 /* Only called from communication thread in SMP mode */
1874 static void CommunicationServer(int sleepTime) {
1875 #if CMK_SMP_NO_COMMTHD
1876     sleep(sleepTime);
1877 #else
1878     
1879     if(commThdExit) {
1880         MACHSTATE2(2, "[%d-%p] comm server exit begin {",CmiMyPe(),CmiGetState());
1881         ConverseCommonExit();        
1882         MACHSTATE2(2, "[%d-%p] comm server exit end }",CmiMyPe(),CmiGetState());
1883         pthread_exit(NULL);
1884     }
1885
1886     if(!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
1887
1888 #if CMK_IMMEDIATE_MSG
1889     /*MACHSTATE1(2, "[%p] Handling Immediate Message begin{",CmiGetState());*/
1890     CmiHandleImmediate();
1891     /*MACHSTATE1(2, "[%p] Handling Immediate Message end}",CmiGetState());*/
1892 #endif
1893
1894 #if DECOUPLE_BCAST_PROCESS
1895     /*MACHSTATE1(2, "[%p] Enter ProcessBroadcastMsg begin{",CmiGetState());*/
1896     ProcessBroadcastMsg(0);
1897     /*MACHSTATE1(2, "[%p] Enter ProcessBroadcastMsg end}",CmiGetState());*/
1898 #endif
1899
1900     /* sleep(sleepTime) */
1901 #endif
1902 }
1903
1904 static void ConverseRunPE(int everReturn) {
1905     CmiIdleState *s;
1906     char** CmiMyArgv;
1907     int i;
1908     CpvInitialize(void *,CmiLocalQueue);
1909     CpvInitialize(unsigned, networkProgressCount);
1910
1911     CpvInitialize(PCQueue, procBcastQ);
1912     CpvAccess(procBcastQ) = PCQueueCreate();
1913
1914 #if ENSURE_MSG_PAIRORDER
1915     CpvInitialize(MsgOrderInfo, p2pMsgSeqInfo);
1916     initMsgOrderInfo(&CpvAccess(p2pMsgSeqInfo));
1917
1918     CpvInitialize(MsgOrderInfo, bcastMsgSeqInfo); 
1919     initMsgOrderInfo(&CpvAccess(bcastMsgSeqInfo));    
1920 #endif
1921
1922 #if MACHINE_DEBUG_LOG
1923     {
1924         char ln[200];
1925         sprintf(ln,"debugLog.%d",CmiMyPe());
1926         CpvInitialize(FILE *, debugLog);
1927         CpvAccess(debugLog)=fopen(ln,"w");
1928     }
1929 #endif
1930
1931     /* To make sure cpvaccess is correct? -Chao Mei */
1932     CmiNodeAllBarrier();
1933
1934     /* Added by Chao Mei */
1935 #if CMK_SMP
1936     if(CmiMyRank()) {
1937         /* The master core of this node is already initialized */
1938         lapi_info_t info;
1939         int testnode, testnumnodes;
1940         memset(&info,0,sizeof(info));
1941         /*CpvInitialize(lapi_handle_t *, lapiContext);
1942         CpvAccess(lapiContext) = CpvAccessOther(lapiContext, 0)+CmiMyRank();
1943         CpvAccess(lapiContext) = CpvAccessOther(lapiContext, 0);
1944         */       
1945         MACHSTATE2(2, "My rank id=%d, lapicontext=%p", CmiMyRank(), &lapiContext);
1946
1947         check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, &testnode));
1948         check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, &testnumnodes));
1949
1950         MACHSTATE3(2, "My rank id=%d, Task id=%d, Num tasks=%d", CmiMyRank(), testnode, testnumnodes);
1951     }
1952 #endif
1953
1954
1955     MACHSTATE2(2, "[%d] ConverseRunPE (thread %p)",CmiMyRank(),CmiGetState());
1956         
1957     CmiNodeAllBarrier();
1958
1959     MACHSTATE(2, "After NodeBarrier in ConverseRunPE");
1960     
1961     CpvAccess(CmiLocalQueue) = CmiGetState()->localqueue;
1962
1963     if(CmiMyRank())
1964         CmiMyArgv=CmiCopyArgs(Cmi_argv);
1965     else
1966         CmiMyArgv=Cmi_argv;
1967
1968     CthInit(CmiMyArgv);
1969
1970     MACHSTATE(2, "After CthInit in ConverseRunPE");
1971
1972     ConverseCommonInit(CmiMyArgv);
1973
1974     MACHSTATE(2, "After ConverseCommonInit in ConverseRunPE");
1975
1976    /**
1977      * In SMP, the machine layer usually has one comm thd, and it is 
1978      * designed to be responsible for all network communication. So 
1979      * if there's no dedicated processor for the comm thread, it has 
1980      * to share a proc with a worker thread. In this scenario, 
1981      * the worker thread needs to yield for some time to give CPU 
1982      * time to comm thread. However, in current configuration, we 
1983      * will always dedicate one proc for the comm thd, therefore, 
1984      * such yielding scheme is not necessary.  Besides, avoiding 
1985      * this yielding scheme improves performance because worker 
1986      * thread doesn't need to yield and will be more responsive to 
1987      * incoming messages. So, we will always use CmiNotifyIdle 
1988      * instead. 
1989      *  
1990      * --Chao Mei
1991      */
1992 #if 0 && CMK_SMP
1993     s=CmiNotifyGetState();
1994     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1995     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1996 #else
1997     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1998 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
1999     /* If there's comm thread, then comm thd is responsible for advancing comm */
2000     if(!CsvAccess(lapiInterruptMode)) {
2001         CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn)AdvanceCommunication, NULL);
2002     }
2003 #endif
2004 #endif
2005
2006 #if CMK_IMMEDIATE_MSG
2007     /* Converse initialization finishes, immediate messages can be processed.
2008        node barrier previously should take care of the node synchronization */
2009     _immediateReady = 1;
2010 #endif
2011
2012     /* communication thread */
2013     if (CmiMyRank() == CmiMyNodeSize()) {
2014         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2015         MACHSTATE2(3, "[%p]: Comm thread on node %d is going to be a communication server", CmiGetState(), CmiMyNode());            
2016         while(1) CommunicationServer(5);        
2017     } else { /* worker thread */
2018         if (!everReturn) {
2019             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2020             MACHSTATE1(3, "[%p]: Worker thread is going to work", CmiGetState());
2021             if (Cmi_usrsched==0) CsdScheduler(-1);
2022             ConverseExit();
2023         }
2024     }
2025 }
2026
2027 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
2028     int n,i;
2029
2030     lapi_info_t info;
2031
2032 #if ENSURE_MSG_PAIRORDER
2033    cmplHdlrThdLock = CmiCreateLock();
2034 #endif
2035
2036     /* processor per node */
2037     /** 
2038      *  We have to determin the ppn at this point in order to create
2039      *  the corresponding number of lapiContext instances.
2040      */
2041 #if CMK_SMP
2042     CmiMyNodeSize() = 1;
2043     CmiGetArgInt(argv,"+ppn", &CmiMyNodeSize());
2044 #else
2045     if (CmiGetArgFlag(argv,"+ppn")) {
2046         CmiAbort("+ppn cannot be used in non SMP version!\n");
2047     }
2048 #endif
2049
2050     memset(&info,0,sizeof(info));
2051
2052     /* Register error handler (redundant?) -- added by Chao Mei*/
2053     info.err_hndlr = (LAPI_err_hndlr *)lapi_err_hndlr;
2054
2055     /* Indicates the number of completion handler threads to create */
2056     /* The number of completion hndlr thds will affect the atomic PCQueue operations!! */
2057     /* NOTE: num_compl_hndlr_thr is obsolete now! --Chao Mei */
2058     /* info.num_compl_hndlr_thr = 1; */
2059
2060     check_lapi(LAPI_Init,(&lapiContext, &info));
2061     
2062     /* It's a good idea to start with a fence,
2063        because packets recv'd before a LAPI_Init are just dropped. */
2064     check_lapi(LAPI_Gfence,(lapiContext));
2065
2066     check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, &CmiMyNode()));
2067     check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, &CmiNumNodes()));
2068
2069     /* Make polling as the default mode as real apps have better perf */
2070     CsvAccess(lapiInterruptMode) = 0;
2071     if(CmiGetArgFlag(argv,"+poll")) CsvAccess(lapiInterruptMode) = 0;
2072     if(CmiGetArgFlag(argv,"+nopoll")) CsvAccess(lapiInterruptMode) = 1;    
2073
2074     check_lapi(LAPI_Senv,(lapiContext, ERROR_CHK, lapiDebugMode));
2075     check_lapi(LAPI_Senv,(lapiContext, INTERRUPT_SET, CsvAccess(lapiInterruptMode)));
2076
2077     if(CmiMyNode()==0) {
2078         printf("Running lapi in interrupt mode: %d\n", CsvAccess(lapiInterruptMode));
2079         printf("Running lapi with %d completion handler threads.\n", info.num_compl_hndlr_thr);
2080     }
2081
2082     /** 
2083      *  Associate PumpMsgsBegin with var "lapiHeaderHandler". Then inside Xfer calls,
2084      *  lapiHeaderHandler could be used to indicate the callback
2085      *  instead of PumpMsgsBegin --Chao Mei
2086      */
2087     check_lapi(LAPI_Addr_set,(lapiContext,(void *)PumpMsgsBegin,lapiHeaderHandler));
2088
2089     CmiNumPes() = CmiNumNodes() * CmiMyNodeSize();
2090     Cmi_nodestart = CmiMyNode() * CmiMyNodeSize();
2091
2092     Cmi_argv = argv;
2093     Cmi_startfn = fn;
2094     Cmi_usrsched = usched;
2095
2096     if (CmiGetArgFlag(argv,"++debug")) {  /*Pause so user has a chance to start and attach debugger*/
2097         printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
2098         if (!CmiGetArgFlag(argv,"++debug-no-pause"))
2099             sleep(30);
2100     }
2101
2102     CsvInitialize(CmiNodeState, NodeState);
2103     CmiNodeStateInit(&CsvAccess(NodeState));
2104
2105 #if CMK_NODE_QUEUE_AVAILABLE
2106     CsvInitialize(PCQueue, nodeBcastQ);
2107     CsvAccess(nodeBcastQ) = PCQueueCreate();
2108 #endif
2109
2110     CmiStartThreads(argv);
2111
2112     ConverseRunPE(initret);
2113 }
2114
2115 /***********************************************************************
2116  *
2117  * Abort function:
2118  *
2119  ************************************************************************/
2120
2121 void CmiAbort(const char *message) {
2122     CmiError(message);
2123     LAPI_Term(lapiContext);
2124     exit(1);
2125 }
2126
2127 static void PerrorExit(const char *msg) {
2128     perror(msg);
2129     LAPI_Term(lapiContext);
2130     exit(1);
2131 }
2132