868e17af0d4d485ed6dcecb1e9991bf72a0e4e3f
[charm.git] / src / arch / lapi / machine.c
1 /*****************************************************************************
2 LAPI version of machine layer
3 Based on the template machine layer
4
5 Developed by
6 Filippo Gioachin   03/23/05
7 Chao Mei 01/28/2010
8 ************************************************************************/
9
10 #include <lapi.h>
11
12 #include "converse.h"
13
14 #include <assert.h>
15 #include <errno.h>
16
17 /*Support for ++debug: */
18 #include <unistd.h> /*For getpid()*/
19 #include <stdlib.h> /*For sleep()*/
20
21 #include "machine.h"
22
23 /* Read the following carefully before building the machine layer on LAPI */
24 /* =========BEGIN OF EXPLANATION OF MACRO USAGE=============*/
25 /**
26  * 1. non-SMP mode: 
27  *   CMK_SMP = 0; 
28  *   CMK_PCQUEUE_LOCK = 1; (could be removed if memory fence and atomic ops are used) 
29  *  
30  *   (The following two could be disabled to reduce the overhead of machine layer) 
31  *     ENSURE_MSG_PAIRORDER = 0|1; 
32  *     ENABLE_CONVERSE_QD = 0|1; 
33  *  
34  *   (If ENSURE_MSG_PAIRORDER is 1, then setting DECOUPLE_BCAST_PROCESS to 1
35  *   will make the msg seqno increase at step of 1 w/o data race;
36  *     DECOUPLE_BCAST_PROCESS = 0|1;
37  * =========================================================== 
38  * 2. SMP mode without comm thd:
39  *    CMK_SMP = 1;
40  *    CMK_PCQUEUE_LOCK = 1;
41  *    CMK_SMP_NO_COMMTHD = 1;
42  *  
43  *    ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
44  *  
45  *    DECOUPLE_BCAST_PROCESS has same options as in non-SMP mode;
46  * =========================================================== 
47  *  3. SMP mode with comm thd:
48  *     CMK_SMP = 1;
49  *     CMK_PCQUEUE_LOCK = 1;
50  *     CMK_SMP_NO_COMMTHD = 0;
51  *  
52  *     ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
53  *  
54  *     (The following must be set with 1 as bcast msg is dealt with in comm thd!)
55  *     DECOUPLE_BCAST_PROCESS = 1;
56  *  ===========================================================
57  *  
58  *  Assumptions we made in different mode:
59  *  1. non-SMP:
60  *     a) imm msgs are processed when the proc is idle, and periodically. They should
61  *        never be processed in the LAPI thread;
62  *     b) forwarding msgs could be done on the proc or in the internal LAPI
63  *        completion handler threads;
64  *  2. SMP w/o comm thd:
65  *     a) same with non-SMP a)
66  *     b) forwarding bcast msgs could be done on proc whose rank=0;
67  *        (enable DECOUPLE_BCAST_PROCESS)  or in internal LAPI completion
68  *        handler threads;
69  *     c) the destination rank of proc-level bcast msg is always 0;
70  *  3. SMP w/ comm thd:
71  *     a) imm msgs are processed in comm thd;
72  *     b) forwarding bcast msgs is done in comm thd;
73  *     c) same with 2 c)
74  *  
75  */
76 /* =========END OF EXPLANATION OF MACRO USAGE=============*/
77
78 #if CMK_SMP
79 #define CMK_PCQUEUE_LOCK 1
80 #else
81 /** 
82  *  In non-smp case: the LAPI completion handler thread will
83  *  also access the proc's recv queue (a PCQueue), so the queue
84  *  needs to be protected. The number of producers equals the
85  *  #completion handler threads, while there's only one consumer
86  *  for the queue. Right now, the #completion handler threads is
87  *  set to 1, so the atomic operation for PCQueue should be
88  *  achieved via memory fence. --Chao Mei
89  */
90
91 /* Redefine CmiNodeLocks only for PCQueue data structure */
92 #define CmiNodeLock CmiNodeLock_nonsmp
93 #undef CmiCreateLock
94 #undef CmiLock
95 #undef CmiUnlock
96 #undef CmiTryLock
97 #undef CmiDestroyLock
98 typedef pthread_mutex_t *CmiNodeLock_nonsmp;
99 CmiNodeLock CmiCreateLock(){
100   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));  
101   pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
102   return lk;
103 }
104 #define CmiLock(lock) (pthread_mutex_lock(lock))
105 #define CmiUnlock(lock) (pthread_mutex_unlock(lock))
106 #define CmiTryLock(lock) (pthread_mutex_trylock(lock))
107 void CmiDestroyLock(CmiNodeLock lock){
108     pthread_mutex_destroy(lock);
109     free(lock);
110 }
111 #define CMK_PCQUEUE_LOCK 1
112 #endif
113 #include "pcqueue.h"
114
115 /** 
116  *  The converse qd is rarely used in current charm apps, so the
117  *  counter for converse qd could be disabled for max
118  *  performance. --Chao Mei
119  */
120 #define ENABLE_CONVERSE_QD 1
121
122 #if CMK_SMP
123 /** 
124  *  The macro defines whether to have a comm thd to offload some
125  *  work such as processing immdiate messages, forwarding
126  *  broadcast messages etc. This macro should be defined before
127  *  including "machine-smp.c".
128  *  --Chao Mei
129  */
130 #define CMK_SMP_NO_COMMTHD 0
131 #if CMK_SMP_NO_COMMTHD
132 int Cmi_commthread = 0;
133 #else
134 int Cmi_commthread = 1;
135 #endif
136
137 #endif
138
139 /** 
140  *  Enable this macro will offload the broadcast relay from the
141  *  internal completion handler thread. This will make the msg
142  *  seqno free of data-race. In SMP mode with comm thread where
143  *  comm thread will forward bcast msgs, this macro should be
144  *  enabled.
145  */
146 #define DECOUPLE_BCAST_PROCESS 1
147
148 /** 
149  * #####REGARDING IN-ORDER MESSAGE DELIVERY BETWEEN A PAIR OF 
150  * PROCESSORS#####: 
151  *  
152  * Since the lapi doesn't guarantee the order of msg delivery 
153  * (via network) between a pair of processors, we need to ensure 
154  * this order via msg seq no and a window sliding based msg 
155  * receiving scheme. So two extra fields are added to the basic 
156  * msg header: srcPe and seqno. For node messages, we process it 
157  * like a msg to be delivered to the first proc (rank=0) of that 
158  * node. 
159  *  
160  * BTW: The in-order delivery between two processors in the same
161  * node is guaranteed in the SMP mode as the msg transfer 
162  * doesn't go through LAPI. 
163  *  
164  * The msg transferred through LAPI (via network) is always 
165  * delivered to the first proc (whose rank is 0) on that node! 
166  *  
167  * --Chao Mei 
168  */
169 #define ENSURE_MSG_PAIRORDER 1
170
171 #if ENSURE_MSG_PAIRORDER
172
173 #define MAX_MSG_SEQNO 65535
174 /* MAX_WINDOW_SIZE should be smaller than MAX_MSG_SEQNO, and MAX(unsigned char) */
175 #define MAX_WINDOW_SIZE 128
176 #define INIT_WINDOW_SIZE 8
177
178 /* The lock to ensure the completion handler (PumpMsgsComplete) is thread-safe */
179 CmiNodeLock cmplHdlrThdLock = NULL;
180
181 /** 
182  *  expectedMsgSeqNo is an int array of size "#procs". It tracks
183  *  the expected seqno recved from other procs to this proc.
184  *  
185  *  nextMsgSeqNo is an int array of size "#procs". It tracks
186  *  the next seqno of the msg to be sent from this proc to other
187  *  procs.
188  *  
189  *  oooMsgBuffer is an array of sizeof(void **)*(#procs), each
190  * element (created on demand) points to a window (array) size 
191  * of CUR_WINDOW_SIZE which buffers the out-of-order incoming 
192  * messages (a (void *) array) 
193  *  
194  * oooMaxOffset indicates the maximum offset of the ooo msg 
195  * ahead of the expected msg. The offset begins with 1, i.e., 
196  * (offset-1) is the index of the ooo msg in the window 
197  * (oooMsgBuffer) 
198  *  
199  *  --Chao Mei
200  */
201
202 typedef struct MsgOrderInfoStruct{
203     /* vars used on sender side */
204     int *nextMsgSeqNo;
205
206     /* vars used on recv side */
207     int *expectedMsgSeqNo;        
208     void ***oooMsgBuffer;
209     unsigned char *oooMaxOffset;
210     unsigned char *CUR_WINDOW_SIZE;
211 }MsgOrderInfo;
212
213 /** 
214  *  broadcast msgs and p2p msgs uses different tracks of seq no
215  *  as is the case with "net" layers.
216  */
217 CpvDeclare(MsgOrderInfo, p2pMsgSeqInfo);
218 CpvDeclare(MsgOrderInfo, bcastMsgSeqInfo);
219
220
221 #endif
222
223 /*
224     To reduce the buffer used in broadcast and distribute the load from
225   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
226   spanning tree broadcast algorithm.
227     This will use the fourth short in message as an indicator of spanning tree
228   root.
229 */
230
231 #undef CMK_BROADCAST_SPANNING_TREE
232 #define CMK_BROADCAST_SPANNING_TREE 0
233
234 /*#define BROADCAST_SPANNING_FACTOR        CMK_SPANTREE_MAXSPAN*/
235 #define BROADCAST_SPANNING_FACTOR   2
236
237 #undef CMK_BROADCAST_HYPERCUBE
238 #define CMK_BROADCAST_HYPERCUBE     1
239
240 /**
241  * The broadcast root of a msg. 
242  * 0: indicate a non-broadcast msg 
243  *  
244  * >0: indicate a proc-level broadcast msg ("root-1" indicates 
245  * the proc that starts a broadcast)
246  *  
247  * <0: indicate a node-level broadcast msg ("-root-1" indicates 
248  * the node that starts a broadcast) 
249  *  
250  * On BG/P, we have a separate broadcast queue on each 
251  * processor. This will allow us to use the separate thread 
252  * (comm thread) to offload the broadcast operation from worker 
253  * threads --Chao Mei 
254  */
255 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
256 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
257 /* The actual msg size including the msg header */
258 #define CMI_MSG_SIZE(msg)                ((CmiMsgHeaderBasic *)msg)->size
259
260 #define CMI_MSG_SRCPE(msg)               ((CmiMsgHeaderBasic *)msg)->srcpe
261 #define CMI_MSG_SEQNO(msg)               ((CmiMsgHeaderBasic *)msg)->seqno
262
263 CpvDeclare(unsigned , networkProgressCount);
264 int networkProgressPeriod = 1000;
265
266 static int lapiDebugMode=0;
267 CsvDeclare(int, lapiInterruptMode);
268
269 static void ConverseRunPE(int everReturn);
270 static void PerrorExit(const char *msg);
271
272 static int Cmi_nodestart;   /* First processor in this node - stupid need due to machine-smp.h that uses it!!  */
273
274 static volatile int commThdExit = 0;
275
276 #include "machine-smp.c"
277
278
279 /* Variables describing the processor ID */
280
281 /* non-smp mode */
282 #if CMK_SHARED_VARS_UNAVAILABLE
283 /* Should the non-smp also needs the concept of node which equals to processor -Chao Mei */
284 int _Cmi_mype;
285 int _Cmi_numpes;
286 int _Cmi_myrank;
287
288 void CmiMemLock() {}
289 void CmiMemUnlock() {}
290
291 static struct CmiStateStruct Cmi_state;
292
293 #define CmiGetState() (&Cmi_state)
294 #define CmiGetStateN(n) (&Cmi_state)
295
296 void CmiYield() {
297     sleep(0);
298 }
299
300 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
301
302 int _Cmi_numpes;
303 int _Cmi_mynodesize;
304 int _Cmi_mynode;
305 int _Cmi_numnodes;
306
307 int CmiMyPe(void) {
308     return CmiGetState()->pe;
309 }
310
311 int CmiMyRank(void) {
312     return CmiGetState()->rank;
313 }
314
315 int CmiNodeFirst(int node) {
316     return node*_Cmi_mynodesize;
317 }
318 int CmiNodeSize(int node)  {
319     return _Cmi_mynodesize;
320 }
321
322 int CmiNodeOf(int pe)      {
323     return (pe / _Cmi_mynodesize);
324 }
325 int CmiRankOf(int pe)      {
326     return (pe % _Cmi_mynodesize);
327 }
328 #endif
329
330 CpvDeclare(void*, CmiLocalQueue);
331
332 #if DECOUPLE_BCAST_PROCESS
333 CpvDeclare(PCQueue, procBcastQ);
334 #if CMK_NODE_QUEUE_AVAILABLE
335 CsvDeclare(PCQueue, nodeBcastQ);
336 #endif
337 #endif
338
339 #if CMK_NODE_QUEUE_AVAILABLE
340 #define DGRAM_NODEMESSAGE   (0x7EFB)
341 #define NODE_BROADCAST_OTHERS (-1)
342 #define NODE_BROADCAST_ALL    (-2)
343 #endif
344
345 /* The way to read this macro
346  * "routine args" becomes a function call inside the parameters of check_lapi_err,
347  * and it returns a int as returnCode;
348  * "#routine" turns the "routine" as a string
349  * __LINE__ is the line number in the source file
350  * -Chao Mei
351  */
352 #define check_lapi(routine,args) \
353         check_lapi_err(routine args, #routine, __LINE__);
354
355 static void check_lapi_err(int returnCode,const char *routine,int line) {
356     if (returnCode!=LAPI_SUCCESS) {
357         char errMsg[LAPI_MAX_ERR_STRING];
358         LAPI_Msg_string(returnCode,errMsg);
359         fprintf(stderr,"Fatal LAPI error while executing %s at %s:%d\n"
360                 "  Description: %s\n", routine, __FILE__, line, errMsg);
361         CmiAbort("Fatal LAPI error");
362     }
363 }
364
365 static void lapi_err_hndlr(lapi_handle_t *hndl, int *error_code, 
366                             lapi_err_t *err_type, int *task_ID, int *src){
367     char errstr[LAPI_MAX_ERR_STRING];
368     LAPI_Msg_string(*error_code, errstr);
369     fprintf(stderr, "ERROR IN LAPI: %s for task %d at src %d\n", errstr, *task_ID, *src);
370     LAPI_Term(*hndl);
371     exit(1);
372 }
373
374 /**
375  * The lapiContext stands for the lapi context for a single lapi 
376  * task. And inside one lapi task, only one lapi context could 
377  * be created via lapi_init. In SMP mode, this context is 
378  * created by proc of rank 0, and then it is shared among all 
379  * cores on a node (threads) --Chao Mei 
380  * 
381  */
382 static lapi_handle_t lapiContext;
383 static lapi_long_t lapiHeaderHandler = 1;
384
385 /**
386  * Note on broadcast functions: 
387  * The converse QD may be wrong if using spanning tree or 
388  * hypercube schemes to send messages --Chao Mei 
389  */
390 void SendMsgToPeers(int size, char *msg, int includeSelf);
391 #if ENSURE_MSG_PAIRORDER
392 void SendSpanningChildren(int size, char *msg, int srcPe, int *seqNoArr);
393 void SendHypercube(int size, char *msg, int srcPe, int *seqNoArr);
394 #else
395 void SendSpanningChildren(int size, char *msg);
396 void SendHypercube(int size, char *msg);
397 #endif
398
399 #if CMK_NODE_QUEUE_AVAILABLE
400 /** 
401  *  The sending schemes of the following two functions are very
402  *  similar to its corresponding proc-level functions
403  */
404 void SendSpanningChildrenNode(int size, char *msg);
405 void SendHypercubeNode(int size, char *msg);
406 #endif
407
408 /**
409  * There is a function "CkCopyMsg" in the charm++ level, which
410  * considers more than mere memcpy as there could be var size msg
411  * --Chao Mei
412  */
413 char *CopyMsg(char *msg, int len) {
414     char *copy = (char *)CmiAlloc(len);
415     if (!copy)
416         fprintf(stderr, "Out of memory\n");
417     memcpy(copy, msg, len);
418     return copy;
419 }
420  
421 CsvDeclare(CmiNodeState, NodeState);
422
423 #if CMK_IMMEDIATE_MSG
424 #include "immediate.c"
425 #endif
426
427 /** 
428  *  This function will be never called in the comm thd!
429  *  It is only used in the non-SMP mode, or in SMP mode w/o comm
430  *  thd! --Chao Mei
431  */
432 static void AdvanceCommunication(){
433 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
434
435     if(!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
436 #if CMK_IMMEDIATE_MSG
437     /** 
438      * Immediate msgs are handled in CmiPushNode and 
439      * CmiSendNodeSelf, CmiPushPe and CmiSendSelf in the non-SMP 
440      * case, but in SMP case, those four functions could be called 
441      * in the completition handler where "CmiMyPe(), CmiMyRank()" 
442      * will be wrong as the "CmiGetState()" will not return a right 
443      * proc-specific CmiState!! So immediate messages are handled 
444      * when proc is idle. This may cause a big delay for processing 
445      * immdiate messages in SMP mode if there's not a dedicated 
446      * communication thread. 
447      *  
448      * Even in non-SMP, inside imm msg handlers, array proxy 
449      * messages could be sent. If handled in LAPI internal 
450      * completion thread, it will cause data-racing problems for 
451      * data structures that maintain charm array info. So it needs 
452      * to be handled in the context of a proc. 
453      *  
454      * -Chao Mei 
455      */
456     MACHSTATE1(2, "[%p] Handling Immediate Message begin{",CmiGetState());
457     CmiHandleImmediate();
458     MACHSTATE1(2, "[%p] Handling Immediate Message end}",CmiGetState());
459 #endif
460
461 #endif
462 }
463
464 /* non-smp CmiStartThreads. -Chao Mei */
465 #if CMK_SHARED_VARS_UNAVAILABLE
466 /* To conform with the call made in SMP mode */
467 static void CmiStartThreads(char **argv) {
468     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
469 }
470 #endif
471
472 /* ===== Beginging of functions regarding ensure in-order msg delivery ===== */
473 #if ENSURE_MSG_PAIRORDER
474
475 /**
476  * "setNextMsgSeqNo" actually sets the current seqno, the 
477  * "getNextMsgSeqNo" will increment the seqno, i.e., 
478  * "getNextMgSeqNo" returns the next seqno based on the previous 
479  * seqno stored in the seqno array. 
480  * --Chao Mei 
481  */
482 static int getNextMsgSeqNo(int *seqNoArr, int destPe){
483     int ret = seqNoArr[destPe];
484     ret++;
485     return ret;
486 }
487 static void setNextMsgSeqNo(int *seqNoArr, int destPe, int val){
488     /* the seq no. may fast-forward to a new round (i.e., starting from 1 again!) */
489     if(val>=MAX_MSG_SEQNO) val -= MAX_MSG_SEQNO;
490     seqNoArr[destPe] = val;
491 }
492
493 #define getNextExpectedMsgSeqNo(seqNoArr,srcPe) getNextMsgSeqNo(seqNoArr, srcPe)
494 #define setNextExpectedMsgSeqNo(seqNoArr,srcPe,val) setNextMsgSeqNo(seqNoArr, srcPe, val)
495
496 #endif
497 /* ===== End of functions regarding ensure in-order msg delivery ===== */
498
499 /* ===========CmiPushPe and CmiPushNode============*/
500 /* Add a message to this processor's receive queue, pe is a rank */
501 void CmiPushPE(int pe,void *msg) {
502     CmiState cs = CmiGetStateN(pe);
503     MACHSTATE3(3,"Pushing message(%p) into rank %d's queue %p {",msg,pe,(cs->recv));
504
505 #if CMK_IMMEDIATE_MSG
506     if (CmiIsImmediate(msg)) {
507         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
508         CMI_DEST_RANK(msg) = pe;
509         CmiPushImmediateMsg(msg);
510         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());
511         return;
512     }
513 #endif
514
515     /* Note atomicity is guaranteed inside pcqueue data structure --Chao Mei */
516     PCQueuePush(cs->recv,msg);
517
518     CmiIdleLock_addMessage(&cs->idle);
519     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
520 }
521
522 #if CMK_NODE_QUEUE_AVAILABLE
523 /*Add a message to this processor's receive queue */
524 /*Note: CmiPushNode is essentially same with CimSendNodeSelf */
525 static void CmiPushNode(void *msg) {    
526     MACHSTATE1(3,"[%p] Pushing message into NodeRecv queue",CmiGetState());
527
528 #if CMK_IMMEDIATE_MSG
529     if (CmiIsImmediate(msg)) {
530         MACHSTATE1(3, "[%p] Push Immediate Message begin {",CmiGetState());
531         CMI_DEST_RANK(msg) = 0;
532         CmiPushImmediateMsg(msg);
533         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());
534         return;
535     }
536 #endif
537     /* CmiNodeRecvLock may not be needed  --Chao Mei*/
538     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
539     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
540     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
541
542     CmiState cs=CmiGetStateN(0);
543     CmiIdleLock_addMessage(&cs->idle);
544
545     MACHSTATE(3,"Pushing message into NodeRecv queue {");
546 }
547 #endif
548
549 /* ======Beginning of helper functions for processing an incoming (network) message ======*/
550 /* Process a proc-level broadcast message */
551 static void ProcessProcBroadcastMsg(char *msg){
552     int nbytes = CMI_MSG_SIZE(msg);    
553 #if ENSURE_MSG_PAIRORDER
554     MACHSTATE3(2,"[%p] the broadcast msg is from pe=%d with seq no=%d", CmiGetState(), CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
555     #if CMK_BROADCAST_SPANNING_TREE    
556     SendSpanningChildren(nbytes, msg, CmiNodeFirst(CmiMyNode()), CpvAccessOther(bcastMsgSeqInfo, 0).nextMsgSeqNo);
557     #elif CMK_BROADCAST_HYPERCUBE    
558     SendHypercube(nbytes, msg, CmiNodeFirst(CmiMyNode()), CpvAccessOther(bcastMsgSeqInfo, 0).nextMsgSeqNo);
559     #endif
560 #else
561     #if CMK_BROADCAST_SPANNING_TREE
562     SendSpanningChildren(nbytes, msg);
563     #elif CMK_BROADCAST_HYPERCUBE
564     SendHypercube(nbytes, msg);
565     #endif
566 #endif                     
567 #if CMK_SMP
568     SendMsgToPeers(nbytes, msg, 1);                   
569     CmiFree(msg);
570 #else
571     /* nonsmp case */
572     CmiPushPE(0, msg);
573 #endif
574 }
575
576 #if CMK_NODE_QUEUE_AVAILABLE
577 /* Process a node-level broadcast message */
578 static void ProcessNodeBroadcastMsg(char *msg){
579     int nbytes = CMI_MSG_SIZE(msg);
580 #if CMK_BROADCAST_SPANNING_TREE
581     SendSpanningChildrenNode(nbytes, msg);
582 #elif CMK_BROADCAST_HYPERCUBE
583     SendHypercubeNode(nbytes, msg);
584 #endif            
585     CmiPushNode(msg);
586 }
587 #endif
588
589 /* Pull msgs from two queues, this function should not be called from cmpl_hdlr thread */
590 static void ProcessBroadcastMsg(int pullRank){
591     char *msg;
592     do{
593         msg = PCQueuePop(CpvAccessOther(procBcastQ, pullRank));
594         if(msg) {
595             MACHSTATE2(4, "[%p]: process a proc-level bcast msg %p begin{", CmiGetState(), msg);
596             ProcessProcBroadcastMsg(msg);
597             MACHSTATE2(4, "[%p]: process a proc-level bcast msg %p end}", CmiGetState(), msg);
598         }else{ 
599             break;
600         }
601     }while (1);
602 #if CMK_NODE_QUEUE_AVAILABLE
603     do{
604         msg = PCQueuePop(CsvAccess(nodeBcastQ));
605         if(msg) {
606             MACHSTATE2(4, "[%p]: process a node-level bcast msg %p begin{", CmiGetState(), msg);
607             ProcessNodeBroadcastMsg(msg);
608             MACHSTATE2(4, "[%p]: process a node-level bcast msg %p end}", CmiGetState(), msg);
609         }else{ 
610             break;
611         }
612     }while (1);    
613 #endif
614 }
615
616
617 #if ENSURE_MSG_PAIRORDER
618 /* return 1 if this msg is an out-of-order incoming message */
619
620 /**
621  * Returns 1 if this "msg" is an out-of-order message, or 
622  * this "msg" is a late message which triggers the process 
623  * of all buffered ooo msgs. 
624  * --Chao Mei 
625  */
626 static int CheckMsgInOrder(char *msg, MsgOrderInfo *info){
627     int srcpe, destrank; 
628     int incomingSeqNo, expectedSeqNo;
629     int curOffset, maxOffset;
630     int i, curWinSize;
631     void **destMsgBuffer = NULL;
632
633     /* numMsg is the number of msgs to be processed in this buffer*/
634     /* Reason to have this extra copy of msgs to be processed: Reduce the atomic granularity */
635     void **toProcessMsgBuffer;
636     int numMsgs = 0;    
637  
638     srcpe = CMI_MSG_SRCPE(msg);
639     destrank = CMI_DEST_RANK(msg);  
640     incomingSeqNo = CMI_MSG_SEQNO(msg);
641     
642     CmiLock(cmplHdlrThdLock);
643
644     expectedSeqNo = getNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe);
645     if(expectedSeqNo == incomingSeqNo){
646         /* Two cases: has ooo msg buffered or not */
647         maxOffset = (info->oooMaxOffset)[srcpe];
648         if(maxOffset>0) {
649             MACHSTATE1(4, "Processing all buffered ooo msgs (maxOffset=%d) including the just recved begin {", maxOffset);
650             curWinSize = info->CUR_WINDOW_SIZE[srcpe];
651             toProcessMsgBuffer = malloc((curWinSize+1)*sizeof(void *));
652             /* process the msg just recved */
653             toProcessMsgBuffer[numMsgs++] = msg;            
654             /* process the buffered ooo msg until the first empty slot in the window */
655             destMsgBuffer = (info->oooMsgBuffer)[srcpe];                       
656             for(curOffset=0; curOffset<maxOffset; curOffset++) {
657                 char *curMsg = destMsgBuffer[curOffset];
658                 if(curMsg == NULL){
659                     CmiAssert(curOffset!=(maxOffset-1));                                        
660                     break;
661                 }
662                 toProcessMsgBuffer[numMsgs++] = curMsg;
663                 destMsgBuffer[curOffset] = NULL;
664             }            
665             /* Update expected seqno, maxOffset and slide the window */
666             if(curOffset < maxOffset) {
667                 int i;
668                 /** 
669                  * now, the seqno of the next to-be-recved msg should be 
670                  * "expectedSeqNo+curOffset+1" as the seqno of the just 
671                  * processed msg is "expectedSeqNo+curOffset. We need to slide 
672                  * the msg buffer window from "curOffset+1" because the first 
673                  * element of the buffer window should always points to the ooo 
674                  * msg that's 1 in terms of seqno ahead of the next to-be-recved 
675                  * msg. --Chao Mei 
676                  */                
677                 
678                 /* moving [curOffset+1, maxOffset) to [0, maxOffset-curOffset-1) in the window */
679                 /* The following two loops could be combined --Chao Mei */
680                 for(i=0; i<maxOffset-curOffset-1; i++){
681                     destMsgBuffer[i] = destMsgBuffer[curOffset+i+1];
682                 }
683                 for(i=maxOffset-curOffset-1; i<maxOffset; i++) {
684                     destMsgBuffer[i] = NULL;
685                 }
686                 (info->oooMaxOffset)[srcpe] = maxOffset-curOffset-1;
687                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+curOffset);
688             }else{
689                 /* there's no remaining buffered ooo msgs */
690                 (info->oooMaxOffset)[srcpe] = 0;
691                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+maxOffset);
692             }
693
694             CmiUnlock(cmplHdlrThdLock);
695                         
696             /* Process the msgs */
697             for(i=0; i<numMsgs; i++){
698                 char *curMsg = toProcessMsgBuffer[i];                
699                 if(CMI_BROADCAST_ROOT(curMsg)>0) {
700                                         
701                 #if DECOUPLE_BCAST_PROCESS
702                     PCQueuePush(CpvAccessOther(procBcastQ, 0), curMsg);
703                 #else
704                     ProcessProcBroadcastMsg(curMsg);
705                 #endif
706
707                 }else{
708                     CmiPushPE(CMI_DEST_RANK(curMsg), curMsg);
709                 }
710             }
711
712             free(toProcessMsgBuffer);
713
714             MACHSTATE1(4, "Processing all buffered ooo msgs (actually processed %d) end }", curOffset);
715             /** 
716              * Since we have processed all buffered ooo msgs including 
717              * this just recved one, 1 should be returned so that this 
718              * msg no longer needs processing 
719              */
720             return 1;
721         }else{
722             /* An expected msg recved without any ooo msg buffered */
723             MACHSTATE1(4, "Receiving an expected msg with seqno=%d\n", incomingSeqNo);
724             setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo);
725
726             CmiUnlock(cmplHdlrThdLock);
727             return 0;
728         }        
729     }
730
731     MACHSTATE2(4, "Receiving an out-of-order msg with seqno=%d, but expect seqno=%d", incomingSeqNo, expectedSeqNo);
732     curWinSize = info->CUR_WINDOW_SIZE[srcpe];
733     if((info->oooMsgBuffer)[srcpe]==NULL) {        
734         (info->oooMsgBuffer)[srcpe] = malloc(curWinSize*sizeof(void *));
735         memset((info->oooMsgBuffer)[srcpe], 0, curWinSize*sizeof(void *));
736     }
737     destMsgBuffer = (info->oooMsgBuffer)[srcpe];
738     curOffset = incomingSeqNo - expectedSeqNo;
739     maxOffset = (info->oooMaxOffset)[srcpe];
740     if(curOffset<0) {
741         /* It's possible that the seqNo starts with another round (exceeding MAX_MSG_SEQNO) with 1 */
742         curOffset += MAX_MSG_SEQNO;
743     }
744     if(curOffset > curWinSize) {
745         int newWinSize;
746         if(curOffset > MAX_WINDOW_SIZE) {
747             CmiAbort("Exceeding the MAX_WINDOW_SIZE!\n");
748         }
749         newWinSize = ((curOffset/curWinSize)+1)*curWinSize;
750         /*CmiPrintf("[%d]: WARNING: INCREASING WINDOW SIZE FROM %d TO %d\n", CmiMyPe(), curWinSize, newWinSize);*/
751         (info->oooMsgBuffer)[srcpe] = malloc(newWinSize*sizeof(void *));
752         memset((info->oooMsgBuffer)[srcpe], 0, newWinSize*sizeof(void *));
753         memcpy((info->oooMsgBuffer)[srcpe], destMsgBuffer, curWinSize*sizeof(void *));
754         info->CUR_WINDOW_SIZE[srcpe] = newWinSize;
755         free(destMsgBuffer);
756         destMsgBuffer = (info->oooMsgBuffer)[srcpe];
757     }    
758     CmiAssert(destMsgBuffer[curOffset-1] == NULL);
759     destMsgBuffer[curOffset-1] = msg;
760     if(curOffset > maxOffset) (info->oooMaxOffset)[srcpe] = curOffset;
761
762     CmiUnlock(cmplHdlrThdLock);
763     return 1;
764 }
765
766 #endif
767 /* ======End of helper functions for processing an incoming (network) message ======*/
768
769 /* ======Begining of lapi callbacks such as the completion handler on the sender and recver side ======*/
770 /** 
771   * lapi completion handler on the recv side. It's responsible to push messages 
772   * to the destination proc or relay broadcast messages. --Chao Mei 
773   *  
774   * Note: The completion handler could be executed on any cores within a node ??? 
775   * So in SMP mode when there's a comm thread, the completion handler should be carefully 
776   * dealt with. 
777   *  
778   * Given lapi also provides an internal lapi thread to deal with network progress which 
779   * will call this function (???), we should be careful with the following situations: 
780   * 1) non SMP mode, with interrupt (lapi internal completion thread) 
781   * 2) non SMP mode, with polling (machine layer is responsible for network progress) 
782   * 3) SMP mode, no comm thread, with polling 
783   * 4) SMP mode, no comm thread, with interrupt 
784   * 5) SMP mode, with comm thread, with polling (not yet implemented, comm server is empty right now)
785   * 6) SMP mode, with comm thread, with interrupt?? 
786   *  
787   * Currently, SMP mode without comm thread is undergoing implementation. 
788   *  
789   * This function is executed by LAPI internal threads. It seems that the number of internal 
790   * completion handler threads could vary during the program. LAPI adaptively creates more 
791   * threads if there are more outstanding messages!!!! This means pcqueue needs protection 
792   * even in the nonsmp case!!!!
793   *  
794   * --Chao Mei 
795   */
796 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info) {
797     int i;
798     char *msg = am_info;
799     int broot, destrank;
800
801     MACHSTATE3(2,"[%p] PumpMsgsComplete with msg %p (isImm=%d) begin {",CmiGetState(), msg, CmiIsImmediate(msg));    
802 #if ENSURE_MSG_PAIRORDER
803     MACHSTATE3(2,"msg %p info: srcpe=%d, seqno=%d", msg, CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
804 #endif
805     /**
806      * First, we check if the msg is a broadcast msg via spanning 
807      * tree. If it is, it needs to call SendSpanningChildren to 
808      * relay the broadcast, and then send the msg to every cores on 
809      * this node.
810      *  
811      * After the first check, we deal with normal messages. 
812      * --Chao Mei
813      */
814 /* It's the right place to relay the broadcast message */
815     /**
816      * 1. For in-order delivery, because this is the handler for 
817      * receiving a message, and we assume the cross-network msgs are 
818      * always delivered to the first proc (rank 0) of this node, we 
819      * select the srcpe of the bcast msgs and the next msg seq no 
820      * correspondingly. 
821      *  
822      * 2. TODO: checking the in-order delivery of p2p msgs!! 
823      *  
824      * --Chao Mei 
825      */
826 #if ENSURE_MSG_PAIRORDER 
827     broot = CMI_BROADCAST_ROOT(msg);
828     destrank = CMI_DEST_RANK(msg);
829     /* Only check proc-level msgs */
830     if (broot>=0
831 #if CMK_NODE_QUEUE_AVAILABLE
832         && destrank != DGRAM_NODEMESSAGE
833 #endif
834     )
835     {
836         MsgOrderInfo *info;        
837         if(broot>0){
838             info = &CpvAccessOther(bcastMsgSeqInfo, destrank);
839             MACHSTATE1(2, "Check msg in-order for bcast msg %p", msg);
840         } else {
841             info = &CpvAccessOther(p2pMsgSeqInfo, destrank);
842             MACHSTATE1(2, "Check msg in-order for p2p msg %p", msg);
843         }
844         if(CheckMsgInOrder(msg,info)) {
845             MACHSTATE(2,"} PumpMsgsComplete end ");
846             return;
847         }
848     }
849 #endif    
850
851 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
852     if (CMI_BROADCAST_ROOT(msg)>0) {
853         MACHSTATE2(2,"[%p] Recved a proc-level broadcast msg %p",CmiGetState(), msg);     
854                 
855     #if DECOUPLE_BCAST_PROCESS
856         PCQueuePush(CpvAccessOther(procBcastQ, 0), msg);
857     #else
858         ProcessProcBroadcastMsg(msg);
859     #endif
860
861         MACHSTATE(2,"} PumpMsgsComplete end ");
862         return;
863     }
864
865 #if CMK_NODE_QUEUE_AVAILABLE
866     if(CMI_BROADCAST_ROOT(msg) < 0) {
867         MACHSTATE1(2,"[%p] Recved a node-level broadcast msg",CmiGetState());  
868         
869     #if DECOUPLE_BCAST_PROCESS
870         PCQueuePush(CsvAccess(nodeBcastQ), msg);
871     #else
872         ProcessNodeBroadcastMsg(msg);
873     #endif
874
875         MACHSTATE(2,"} PumpMsgsComplete end ");
876         return;
877     }
878 #endif
879
880 #endif
881
882 #if CMK_NODE_QUEUE_AVAILABLE
883     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
884         CmiPushNode(msg);
885     else{
886         MACHSTATE3(2,"[%p] Recv a p2p msg from pe=%d with seq no=%d", CmiGetState(), CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
887         CmiPushPE(CMI_DEST_RANK(msg), msg);
888     }
889 #else
890     CmiPushPE(CMI_DEST_RANK(msg), msg);
891 #endif
892
893     MACHSTATE(2,"} PumpMsgsComplete end ");
894     return;
895 }
896
897 /** lapi header handler: executed on the recv side, when the
898  *  first packet of the recving msg arrives, it is called to
899  *  prepare the memory buffer in the user space for recving the
900  *  data --Chao Mei
901  */
902 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
903                            void *hdr, uint *uhdr_len,
904                            lapi_return_info_t *msg_info,
905                            compl_hndlr_t **comp_h, void **comp_am_info) {
906     void *msg_buf;
907     MACHSTATE1(2,"[%p] PumpMsgsBegin begin {",CmiGetState());
908     /* prepare the space for receiving the data, set the completion handler to
909        be executed inline */
910     msg_buf = (void *)CmiAlloc(msg_info->msg_len);
911
912     msg_info->ret_flags = LAPI_SEND_REPLY;
913     *comp_h = PumpMsgsComplete;
914     *comp_am_info = msg_buf;
915     MACHSTATE(2,"} PumpMsgsBegin end");
916     return msg_buf;
917
918 }
919
920 /* The following two are lapi sender handlers */
921 static void ReleaseMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
922     MACHSTATE2(2,"[%p] ReleaseMsg begin %p {",CmiGetState(),msg);
923     check_lapi_err(info->reason, "ReleaseMsg", __LINE__);
924     CmiFree(msg);
925     MACHSTATE(2,"} ReleaseMsg end");
926 }
927
928 static void DeliveredMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
929     MACHSTATE1(2,"[%p] DeliveredMsg begin {",CmiGetState());
930     check_lapi_err(info->reason, "DeliveredMsg", __LINE__);
931     *((int *)msg) = *((int *)msg) - 1;
932     MACHSTATE(2,"} DeliveredMsg end");
933 }
934 /* ======End of lapi callbacks such as the completion handler on the sender and recver side ======*/
935
936
937 #if CMK_NODE_QUEUE_AVAILABLE
938 char *CmiGetNonLocalNodeQ(void) {
939     CmiState cs = CmiGetState();
940     char *result = 0;
941     CmiIdleLock_checkMessage(&cs->idle);
942     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
943         MACHSTATE2(3,"[%p] CmiGetNonLocalNodeQ begin %d {",CmiGetState(),CmiMyPe());
944         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
945         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
946         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
947         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
948     }
949     return result;
950 }
951 #endif
952
953 void *CmiGetNonLocal(void) {    
954     CmiState cs = CmiGetState();
955     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
956     CmiIdleLock_checkMessage(&cs->idle); 
957        
958 #if DECOUPLE_BCAST_PROCESS
959 #if !CMK_SMP
960     ProcessBroadcastMsg(0);
961 #elif CMK_SMP_NO_COMMTHD
962     if(CmiMyRank()==0) ProcessBroadcastMsg(0);
963 #endif
964 #endif
965            
966     void *msg = PCQueuePop(cs->recv);
967     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);    
968     return msg;
969
970 }
971
972 /**
973  * TODO: What will be the effects if calling LAPI_Probe in the 
974  * interrupt mode??? --Chao Mei 
975  */
976
977 /* user call to handle immediate message, since there is no ServerThread polling
978    messages (lapi does all the polling) every thread is authorized to process
979    immediate messages. If we are not in lapiInterruptMode check for progress.
980 */
981 void CmiMachineProgressImpl(){
982     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
983
984 #if CMK_IMMEDIATE_MSG
985     MACHSTATE1(2, "[%p] Handling Immediate Message begin {",CmiGetState());
986     CmiHandleImmediate();
987     MACHSTATE1(2, "[%p] Handling Immediate Message end }",CmiGetState());
988 #endif
989
990 #if CMK_SMP && !CMK_SMP_NO_COMMTHD && DECOUPLE_BCAST_PROCESS
991     if(CmiMyRank()==CmiMyNodeSize()) ProcessBroadcastMsg(0);
992 #endif
993 }
994
995 /*TODO: does lapi provide any Barrrier related functions as DCMF provides??? --Chao Mei */
996 /* Barrier needs to be implemented!!! -Chao Mei */
997 /* These two barriers are only needed by CmiTimerInit to synchronize all the
998    threads. They do not need to provide a general barrier. */
999 int CmiBarrier() {
1000     return 0;
1001 }
1002 int CmiBarrierZero() {
1003     return 0;
1004 }
1005
1006 /********************* MESSAGE SEND FUNCTIONS ******************/
1007
1008 /** 
1009  * "deliverable": used to know if the message can be encoded 
1010  * into the destPE queue withtout duplication (for usage of 
1011  * SMP). If it has already been duplicated (and therefore is 
1012  * deliverable), we do not want to copy it again, while if it 
1013  * has been copied we must do it before enqueuing it. 
1014  *  
1015  * The general send function for all Cmi send functions.
1016  */
1017 void lapiSendFn(int destPE, int size, char *msg, scompl_hndlr_t *shdlr, void *sinfo, int deliverable) {
1018     /* CmiState cs = CmiGetState(); */
1019     CmiUInt2  rank, node;
1020     lapi_xfer_t xfer_cmd;
1021
1022     MACHSTATE3(2,"lapiSendFn to destPE=%d with msg %p (isImm=%d) begin {",destPE,msg, CmiIsImmediate(msg));
1023     MACHSTATE3(2, "inside lapiSendFn 1: size=%d, sinfo=%p, deliverable=%d", size, sinfo, deliverable);
1024     
1025 #if ENSURE_MSG_PAIRORDER
1026     MACHSTATE3(2, "inside lapiSendFn 2: msg src->dest (%d->%d), seqno=%d", CMI_MSG_SRCPE(msg), destPE, CMI_MSG_SEQNO(msg));
1027 #endif
1028
1029     node = CmiNodeOf(destPE);
1030     /** 
1031      *  The rank of the msg should be set before calling
1032      *  lapiSendFn!!
1033      *  The rank could be DGRAM_NODEMESSAGE which indicates
1034      *  a node-level message.
1035      */
1036 #if CMK_SMP
1037     /*CMI_DEST_RANK(msg) = rank;*/
1038     if (node == CmiMyNode())  {
1039         rank = CmiRankOf(destPE);
1040         MACHSTATE2(2,"[%p] inside lapiSendFn for intra-node message (%p)",CmiGetState(), msg);
1041         if (deliverable) {
1042             CmiPushPE(rank, msg);
1043             /* the acknowledge of delivery must not be called */
1044         } else {
1045             CmiPushPE(rank, CopyMsg(msg, size));
1046             /* acknowledge that the message has been delivered */
1047             lapi_sh_info_t lapiInfo;
1048             lapiInfo.src = node;
1049             lapiInfo.reason = LAPI_SUCCESS;
1050             (*shdlr)(&lapiContext, sinfo, &lapiInfo);
1051         }
1052         return;
1053     }
1054 #endif
1055     
1056     MACHSTATE2(2, "Ready to call LAPI_Xfer with destPe=%d, destRank=%d",destPE,CMI_DEST_RANK(msg));
1057
1058     xfer_cmd.Am.Xfer_type = LAPI_AM_XFER;
1059     xfer_cmd.Am.flags     = 0;
1060     xfer_cmd.Am.tgt       = node;
1061     xfer_cmd.Am.hdr_hdl   = lapiHeaderHandler;
1062     xfer_cmd.Am.uhdr_len  = 0;
1063     xfer_cmd.Am.uhdr      = NULL;
1064     xfer_cmd.Am.udata     = msg;
1065     xfer_cmd.Am.udata_len = size;
1066     xfer_cmd.Am.shdlr     = shdlr;
1067     xfer_cmd.Am.sinfo     = sinfo;
1068     xfer_cmd.Am.tgt_cntr  = NULL;
1069     xfer_cmd.Am.org_cntr  = NULL;
1070     xfer_cmd.Am.cmpl_cntr = NULL;
1071
1072     check_lapi(LAPI_Xfer,(lapiContext, &xfer_cmd));
1073
1074     MACHSTATE(2,"} lapiSendFn end");
1075 }
1076
1077 static void CmiSendSelf(char *msg) {
1078     MACHSTATE1(3,"[%p] Sending itself a message {",CmiGetState());
1079
1080 #if CMK_IMMEDIATE_MSG
1081     if (CmiIsImmediate(msg)) {
1082         MACHSTATE1(3, "[%p] Push Immediate Message begin {",CmiGetState());        
1083         CmiPushImmediateMsg(msg);
1084         MACHSTATE1(3, "[%p] Push Immediate Message end }",CmiGetState());        
1085         return;
1086     }
1087 #endif
1088     CQdCreate(CpvAccess(cQdState), 1);
1089     CdsFifo_Enqueue(CmiGetState()->localqueue,msg);
1090
1091     MACHSTATE(3,"} Sending itself a message");
1092 }
1093
1094 void CmiSyncSendFn(int destPE, int size, char *msg) {
1095     CmiState cs = CmiGetState();
1096     char *dupmsg = CopyMsg(msg, size);
1097
1098     MACHSTATE1(3,"[%p] Sending sync message begin {",CmiGetState());
1099     CMI_BROADCAST_ROOT(dupmsg) = 0;
1100     CMI_DEST_RANK(dupmsg) = CmiRankOf(destPE);
1101
1102     if (cs->pe==destPE) {
1103         CmiSendSelf(dupmsg);
1104     } else {
1105     #if ENSURE_MSG_PAIRORDER
1106         CMI_MSG_SRCPE(dupmsg) = CmiMyPe();        
1107         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1108         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(dupmsg));
1109     #endif
1110
1111     #if ENABLE_CONVERSE_QD
1112         CQdCreate(CpvAccess(cQdState), 1);
1113     #endif
1114         lapiSendFn(destPE, size, dupmsg, ReleaseMsg, dupmsg, 1);
1115     }
1116     MACHSTATE(3,"} Sending sync message end");
1117 }
1118
1119 int CmiAsyncMsgSent(CmiCommHandle handle) {
1120     return (*((int *)handle) == 0)?1:0;
1121 }
1122
1123 void CmiReleaseCommHandle(CmiCommHandle handle) {
1124 #ifndef CMK_OPTIMIZE
1125     if (*((int *)handle) != 0) CmiAbort("Released a CmiCommHandle not free!");
1126 #endif
1127     free(handle);
1128 }
1129
1130 /* the CmiCommHandle returned is a pointer to the location of an int. When it is
1131    set to 1 the message is available. */
1132 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
1133     MACHSTATE1(3,"[%p] Sending async message begin {",CmiGetState());
1134     void *handle;
1135     CmiState cs = CmiGetState();
1136     CMI_BROADCAST_ROOT(msg) = 0;
1137     CMI_DEST_RANK(msg) = CmiRankOf(destPE);
1138
1139     /* if we are the destination, send ourself a copy of the message */
1140     if (cs->pe==destPE) {
1141         CmiSendSelf(CopyMsg(msg, size));
1142         MACHSTATE(3,"} Sending async message end");
1143         return 0;
1144     }
1145
1146     handle = malloc(sizeof(int));
1147     *((int *)handle) = 1;
1148
1149 #if ENSURE_MSG_PAIRORDER
1150     CMI_MSG_SRCPE(msg) = CmiMyPe();    
1151     CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1152     setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
1153 #endif
1154
1155 #if ENABLE_CONVERSE_QD
1156     CQdCreate(CpvAccess(cQdState), 1);
1157 #endif
1158     lapiSendFn(destPE, size, msg, DeliveredMsg, handle, 0);
1159     /* the message may have been duplicated and already delivered if we are in SMP
1160        mode and the destination is on the same node, but there is no optimized
1161        check for that. */
1162     MACHSTATE(3,"} Sending async message end");
1163     return handle;
1164 }
1165
1166 void CmiFreeSendFn(int destPE, int size, char *msg) {
1167     MACHSTATE1(3,"[%p] Sending sync free message begin {",CmiGetState());
1168     CmiState cs = CmiGetState();
1169     CMI_BROADCAST_ROOT(msg) = 0;
1170     CMI_DEST_RANK(msg) = CmiRankOf(destPE);
1171
1172     if (cs->pe==destPE) {
1173         CmiSendSelf(msg);
1174     } else {
1175     #if ENSURE_MSG_PAIRORDER
1176         CMI_MSG_SRCPE(msg) = CmiMyPe();        
1177         CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
1178         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
1179     #endif
1180     #if ENABLE_CONVERSE_QD
1181         CQdCreate(CpvAccess(cQdState), 1);
1182     #endif        
1183         lapiSendFn(destPE, size, msg, ReleaseMsg, msg, 1);
1184         /*CmiAsyncSendFn(destPE, size, msg);*/
1185     }
1186     MACHSTATE(3,"} Sending sync free message end");
1187 }
1188
1189 /* ===========Node level p2p send functions============= */
1190 #if CMK_NODE_QUEUE_AVAILABLE
1191 static void CmiSendNodeSelf(char *msg) {
1192     CmiState cs;
1193     MACHSTATE1(3,"[%p] Sending itself a node message {",CmiGetState());
1194
1195 #if CMK_IMMEDIATE_MSG
1196     if (CmiIsImmediate(msg)) {
1197         MACHSTATE1(3, "[%p] Push Immediate Message {",CmiGetState());
1198         CMI_DEST_RANK(msg) = 0;
1199         CmiPushImmediateMsg(msg);
1200         MACHSTATE(3, "} Push Immediate Message end");
1201         return;
1202     }
1203 #endif
1204     CQdCreate(CpvAccess(cQdState), 1);
1205     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1206     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1207     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1208
1209     cs=CmiGetStateN(0);
1210     CmiIdleLock_addMessage(&cs->idle);
1211
1212     MACHSTATE(3,"} Sending itself a node message");
1213 }
1214
1215 /*TODO: not sure whether the in-order delivery affects for node messages?? --Chao Mei */
1216 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
1217     char *dupmsg = CopyMsg(msg, size);
1218
1219     MACHSTATE1(3,"[%p] Sending sync node message begin {",CmiGetState());
1220     CMI_BROADCAST_ROOT(dupmsg) = 0;
1221     CMI_DEST_RANK(dupmsg) = DGRAM_NODEMESSAGE;
1222
1223     if (CmiMyNode()==destNode) {
1224         CmiSendNodeSelf(dupmsg);
1225     } else {
1226     #if ENABLE_CONVERSE_QD
1227         CQdCreate(CpvAccess(cQdState), 1);
1228     #endif
1229         lapiSendFn(CmiNodeFirst(destNode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1230     }
1231     MACHSTATE(3,"} Sending sync node message end");
1232 }
1233
1234 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
1235     void *handle;
1236     CMI_BROADCAST_ROOT(msg) = 0;
1237     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1238
1239     MACHSTATE1(3,"[%p] Sending async node message begin {",CmiGetState());
1240     /* if we are the destination, send ourself a copy of the message */
1241     if (CmiMyNode()==destNode) {
1242         CmiSendNodeSelf(CopyMsg(msg, size));
1243         MACHSTATE(3,"} Sending async node message end");
1244         return 0;
1245     }
1246
1247     handle = malloc(sizeof(int));
1248     *((int *)handle) = 1;
1249
1250 #if ENABLE_CONVERSE_QD
1251     CQdCreate(CpvAccess(cQdState), 1);
1252 #endif
1253     lapiSendFn(CmiNodeFirst(destNode), size, msg, DeliveredMsg, handle, 0);
1254     /* the message may have been duplicated and already delivered if we are in SMP
1255        mode and the destination is on the same node, but there is no optimized
1256        check for that. */
1257     MACHSTATE(3,"} Sending async node message end");
1258     return handle;
1259
1260 }
1261
1262 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
1263     CMI_BROADCAST_ROOT(msg) = 0;
1264     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1265
1266     MACHSTATE1(3,"[%p] Sending sync free node message begin {",CmiGetState());
1267     if (CmiMyNode()==destNode) {
1268         CmiSendNodeSelf(msg);
1269     } else {
1270     #if ENABLE_CONVERSE_QD
1271         CQdCreate(CpvAccess(cQdState), 1);
1272     #endif
1273         lapiSendFn(CmiNodeFirst(destNode), size, msg, ReleaseMsg, msg, 1);
1274     }
1275     MACHSTATE(3,"} Sending sync free node message end");
1276 }
1277 #endif
1278
1279 /*********************** BROADCAST FUNCTIONS **********************/
1280 #if CMK_SMP
1281 /** 
1282   * Sending msgs to cores in the same node 
1283   * "includeSelf" indicates whether the msg should be sent to 
1284   * the proc of rank CMI_DEST_RANK(msg) 
1285   * --Chao mei 
1286   *  
1287   */
1288 void SendMsgToPeers(int size, char *msg, int includeSelf){
1289     if(includeSelf) {
1290         int i;
1291         for(i=0; i<CmiMyNodeSize(); i++) {
1292             char *dupmsg = CopyMsg(msg,size);
1293             CmiPushPE(i,dupmsg);
1294         }
1295     }else{
1296         int i;
1297         int excludeRank = CMI_DEST_RANK(msg);
1298         for(i=excludeRank+1; i<CmiMyNodeSize(); i++) {
1299             char *dupmsg = CopyMsg(msg,size);
1300             CmiPushPE(i,dupmsg);
1301         }
1302         for(i=0; i<excludeRank; i++) {
1303             char *dupmsg = CopyMsg(msg,size);
1304             CmiPushPE(i,dupmsg);
1305         }
1306     }
1307 }
1308 #endif
1309
1310 /** 
1311  * SendSpanningChildren only sends inter-node messages. The 
1312  * intra-node messages are delivered via SendMsgToPeers if it is 
1313  * in SMP mode. 
1314  */
1315 #if ENSURE_MSG_PAIRORDER
1316 void SendSpanningChildren(int size, char *msg, int srcPe, int *seqNoArr){
1317 #else
1318 void SendSpanningChildren(int size, char *msg) {
1319 #endif
1320     int startproc = CMI_BROADCAST_ROOT(msg)-1;
1321     int startnode = CmiNodeOf(startproc);
1322     int i, rp;
1323     char *dupmsg;
1324
1325     CmiAssert(startnode>=0 && startnode<CmiNumNodes());
1326     
1327     MACHSTATE3(3, "[%p] SendSpanningChildren on proc=%d with start proc %d begin {",CmiGetState(), CmiMyPe(), startproc);  
1328     /* rp is the relative node id from start node */  
1329     rp = CmiMyNode() - startnode;
1330     if(rp<0) rp += CmiNumNodes();
1331     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1332         int p = BROADCAST_SPANNING_FACTOR*rp + i;
1333         if (p > CmiNumNodes() - 1) break;
1334         p += startnode;
1335         p = p%CmiNumNodes();
1336         
1337 #if CMK_BROADCAST_USE_CMIREFERENCE
1338         CmiReference(msg);
1339         lapiSendFn(CmiNodeFirst(p), size, msg, ReleaseMsg, msg, 0);
1340 #else
1341         dupmsg = CopyMsg(msg, size);
1342     #if ENSURE_MSG_PAIRORDER
1343         CMI_MSG_SRCPE(dupmsg) = srcPe;
1344         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(seqNoArr, CmiNodeFirst(p));
1345         setNextMsgSeqNo(seqNoArr, CmiNodeFirst(p), CMI_MSG_SEQNO(dupmsg));
1346     #endif
1347         lapiSendFn(CmiNodeFirst(p), size, dupmsg, ReleaseMsg, dupmsg, 1);
1348 #endif
1349     }    
1350
1351     MACHSTATE3(3, "[%p] SendSpanningChildren on proc=%d with start proc %d end }",CmiGetState(), CmiMyPe(), startproc);    
1352 }
1353
1354 /* Return the smallest integer that is larger than or equal to log2(i), e.g CmiLog2(14) = 4*/
1355 static int CmiLog2 (int i) {
1356     int m;
1357     for (m=0; i>(1<<m); ++m);
1358     return m;
1359 }
1360
1361 /* send msg along the hypercube in broadcast. (Chao Mei) */
1362 #if ENSURE_MSG_PAIRORDER
1363 void SendHypercube(int size, char *msg, int srcPe, int *seqNoArr){
1364 #else
1365 void SendHypercube(int size, char *msg) {
1366 #endif
1367     int i, tmp, cnt;   
1368     char *dupmsg;
1369     int dims = 0;
1370     int startproc = CMI_BROADCAST_ROOT(msg)-1;
1371     int startnode = CmiNodeOf(startproc);
1372     /* relative proc id to startnode */
1373     int rp = CmiMyNode() - startnode;    
1374     if(rp < 0) rp += CmiNumNodes();
1375
1376         /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1 */
1377         tmp = CmiNumNodes()-1;
1378         for(tmp=CmiNumNodes()-1; tmp>0; tmp=tmp>>1) dims++;
1379         if(CmiNumNodes()==1) dims=1;
1380
1381         cnt=0; tmp=rp;
1382         for(i=0; i<dims; i++, cnt++){
1383                 if(tmp & 1 == 1) break;
1384                 tmp = tmp >> 1;
1385         }
1386         
1387     MACHSTATE3(3, "[%p] SendHypercube on proc=%d with start proc %d begin {",CmiGetState(), CmiMyPe(), startproc);    
1388     for(i=cnt-1; i>=0; i--) { 
1389         /* destnode is still the relative node id from startnode */
1390         int destnode = rp + (1<<i);
1391         if(destnode > CmiNumNodes()-1) continue; 
1392         
1393         destnode += startnode;
1394         destnode = destnode % CmiNumNodes();
1395
1396         CmiAssert(destnode != CmiMyNode());
1397                
1398 #if CMK_BROADCAST_USE_CMIREFERENCE
1399         CmiReference(msg);
1400         lapiSendFn(CmiNodeFirst(destnode), size, msg, ReleaseMsg, msg, 0);
1401 #else
1402         dupmsg = CopyMsg(msg, size);
1403     #if ENSURE_MSG_PAIRORDER
1404         CMI_MSG_SRCPE(dupmsg) = srcPe;
1405         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(seqNoArr, CmiNodeFirst(destnode));
1406         setNextMsgSeqNo(seqNoArr, CmiNodeFirst(destnode), CMI_MSG_SEQNO(dupmsg));
1407     #endif
1408         lapiSendFn(CmiNodeFirst(destnode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1409 #endif
1410     }    
1411     MACHSTATE3(3, "[%p] SendHypercube on proc=%d with start proc %d end }",CmiGetState(), CmiMyPe(), startproc);    
1412 }
1413
1414 void CmiSyncBroadcastGeneralFn(int size, char *msg) {    /* ALL_EXCEPT_ME  */
1415     int i, rank;
1416     MACHSTATE3(3,"[%p] Sending sync broadcast message %p with size %d begin {",CmiGetState(), msg, size);
1417
1418 #if ENABLE_CONVERSE_QD
1419     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1420 #endif
1421
1422 #if CMK_BROADCAST_SPANNING_TREE
1423     CMI_BROADCAST_ROOT(msg) = CmiMyPe()+1;
1424     /** 
1425      * since the broadcast msg will be relayed separately on 
1426      * remote procs by SendSpanningChildren, the actual msg size 
1427      * needs to be recorded in the header for proper memory 
1428      * allocation. E.g. the bcast msg is relayed in PumpMsgComplete.
1429      * But at that time, the actual msg size is not known if the 
1430      * filed inside the msg header is not set. The unknown actual 
1431      * msg size will cause the relay of msg fail because CopyMsg 
1432      * doesn't have a correct msg size input!! -Chao Mei 
1433      */
1434     CMI_MSG_SIZE(msg) = size;
1435     /* node-aware spanning tree, so bcast msg is always delivered to the first core on each node */
1436     CMI_DEST_RANK(msg) = 0;
1437 #if ENSURE_MSG_PAIRORDER    
1438     SendSpanningChildren(size, msg, CmiMyPe(), CpvAccess(bcastMsgSeqInfo).nextMsgSeqNo);
1439 #else
1440     SendSpanningChildren(size, msg);
1441 #endif
1442         
1443 #if CMK_SMP
1444     CMI_DEST_RANK(msg) = CmiMyRank();
1445     SendMsgToPeers(size, msg, 0);
1446 #endif
1447
1448 #elif CMK_BROADCAST_HYPERCUBE 
1449    
1450     CMI_BROADCAST_ROOT(msg) = CmiMyPe()+1;
1451     CMI_MSG_SIZE(msg) = size;
1452     CMI_DEST_RANK(msg) = 0;    
1453
1454 #if ENSURE_MSG_PAIRORDER    
1455     SendHypercube(size, msg, CmiMyPe(), CpvAccess(bcastMsgSeqInfo).nextMsgSeqNo);
1456 #else
1457     SendHypercube(size, msg);
1458 #endif
1459       
1460 #if CMK_SMP
1461     CMI_DEST_RANK(msg) = CmiMyRank();
1462     SendMsgToPeers(size, msg, 0);
1463 #endif
1464
1465 #else
1466     CmiState cs = CmiGetState();
1467     char *dupmsg;
1468
1469     CMI_BROADCAST_ROOT(msg) = 0;
1470 #if CMK_BROADCAST_USE_CMIREFERENCE
1471     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1472         CmiReference(msg);
1473         lapiSendFn(i, size, msg, ReleaseMsg, msg, 0);
1474         /*CmiSyncSendFn(i, size, msg) ;*/
1475     }
1476     for (i=0; i<cs->pe; i++) {
1477         CmiReference(msg);
1478         lapiSendFn(i, size, msg, ReleaseMsg, msg, 0);
1479         /*CmiSyncSendFn(i, size,msg) ;*/
1480     }
1481 #else
1482 #if ENSURE_MSG_PAIRORDER
1483     CMI_MSG_SRCPE(msg) = CmiMyPe();
1484 #endif
1485     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1486         dupmsg = CopyMsg(msg, size);
1487         CMI_DEST_RANK(dupmsg) = CmiRankOf(i);
1488     #if ENSURE_MSG_PAIRORDER        
1489         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i);
1490         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i, CMI_MSG_SEQNO(dupmsg));
1491     #endif
1492         lapiSendFn(i, size, dupmsg, ReleaseMsg, dupmsg, 1);        
1493     }
1494     for (i=0; i<cs->pe; i++) {
1495         dupmsg = CopyMsg(msg, size);
1496     #if ENSURE_MSG_PAIRORDER        
1497         CMI_MSG_SEQNO(dupmsg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i);
1498         setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, i, CMI_MSG_SEQNO(dupmsg));
1499     #endif
1500         CMI_DEST_RANK(dupmsg) = CmiRankOf(i);
1501         lapiSendFn(i, size, dupmsg, ReleaseMsg, dupmsg, 1);        
1502     }
1503 #endif
1504 #endif
1505
1506     MACHSTATE(3,"} Sending sync broadcast message end");
1507 }
1508
1509 CmiCommHandle CmiAsyncBroadcastGeneralFn(int size, char *msg) {
1510 #if ENSURE_MSG_PAIRORDER
1511     /* Not sure how to add the msg seq no for async broadcast messages --Chao Mei */
1512     /* so abort here ! */
1513     CmiAssert(0);
1514     return 0;
1515 #else
1516     CmiState cs = CmiGetState();
1517     int i, rank;
1518 #if ENABLE_CONVERSE_QD
1519     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1520 #endif
1521     MACHSTATE1(3,"[%p] Sending async broadcast message from {",CmiGetState());
1522     CMI_BROADCAST_ROOT(msg) = 0;
1523     void *handle = malloc(sizeof(int));
1524     *((int *)handle) = CmiNumPes()-1;
1525
1526     for (i=cs->pe+1; i<CmiNumPes(); i++) {
1527         CMI_DEST_RANK(msg) = CmiRankOf(i);
1528         lapiSendFn(i, size, msg, DeliveredMsg, handle, 0);
1529     }
1530     for (i=0; i<cs->pe; i++) {
1531         CMI_DEST_RANK(msg) = CmiRankOf(i);
1532         lapiSendFn(i, size, msg, DeliveredMsg, handle, 0);
1533     }
1534
1535     MACHSTATE(3,"} Sending async broadcast message end");
1536     return handle;
1537 #endif
1538 }
1539
1540 void CmiSyncBroadcastFn(int size, char *msg) {
1541     /*CMI_DEST_RANK(msg) = 0;*/
1542     CmiSyncBroadcastGeneralFn(size, msg);
1543 }
1544
1545 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1546     /*CMI_DEST_RANK(msg) = 0;*/
1547     return CmiAsyncBroadcastGeneralFn(size, msg);
1548 }
1549
1550 void CmiFreeBroadcastFn(int size, char *msg) {
1551     CmiSyncBroadcastFn(size,msg);
1552     CmiFree(msg);
1553 }
1554
1555 void CmiSyncBroadcastAllFn(int size, char *msg) {       /* All including me */
1556     CmiSendSelf(CopyMsg(msg, size));
1557     CmiSyncBroadcastFn(size, msg);
1558 }
1559
1560 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1561     CmiSendSelf(CopyMsg(msg, size));
1562     return CmiAsyncBroadcastFn(size, msg);
1563 }
1564
1565 void CmiFreeBroadcastAllFn(int size, char *msg) {       /* All including me */
1566     CmiSendSelf(CopyMsg(msg, size));
1567     CmiSyncBroadcastFn(size, msg);
1568     CmiFree(msg);
1569 }
1570
1571 #if CMK_NODE_QUEUE_AVAILABLE
1572 void SendSpanningChildrenNode(int size, char *msg){    
1573     int startnode = -CMI_BROADCAST_ROOT(msg)-1;    
1574     int i, rp;
1575     char *dupmsg;
1576
1577     MACHSTATE3(2, "[%p] SendSpanningChildrenNode on node %d with startnode %d", CmiGetState(), CmiMyNode(), startnode);
1578     CmiAssert(startnode>=0 && startnode<CmiNumNodes());
1579         
1580     rp = CmiMyNode() - startnode;
1581     if (rp<0) rp+=CmiNumNodes();        
1582     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1583         int p = BROADCAST_SPANNING_FACTOR*rp + i;
1584         if (p > CmiNumNodes() - 1) break;
1585         p += startnode;
1586         p = p%CmiNumNodes();
1587         
1588 #if CMK_BROADCAST_USE_CMIREFERENCE
1589         CmiReference(msg);
1590         lapiSendFn(CmiNodeFirst(p), size, msg, ReleaseMsg, msg, 0);
1591 #else
1592         dupmsg = CopyMsg(msg, size);
1593         lapiSendFn(CmiNodeFirst(p), size, dupmsg, ReleaseMsg, dupmsg, 1);
1594 #endif
1595     }
1596     MACHSTATE3(3, "[%p] SendSpanningChildrenNode on node=%d with start node %d end }",CmiGetState(), CmiMyNode(), startnode);
1597 }
1598
1599 /* send msg along the hypercube in broadcast. (Chao Mei) */
1600 void SendHypercubeNode(int size, char *msg) {
1601     int i, dist, tmp, cnt;   
1602     char *dupmsg;
1603     int dims = 0;
1604     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1605     int rp = CmiMyNode() - startnode;    
1606     if(rp < 0) rp += CmiNumNodes();
1607     dist = rp;
1608
1609         /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1 */    
1610         tmp = CmiNumNodes()-1;
1611         for(tmp=CmiNumNodes()-1; tmp>0; tmp=tmp>>1) dims++;
1612         if(CmiNumNodes()==1) dims=1;
1613         
1614         cnt=0; tmp=rp;
1615         for(i=0; i<dims; i++, cnt++){
1616                 if(tmp & 1 == 1) break;
1617                 tmp = tmp >> 1;
1618         }
1619         
1620     MACHSTATE3(3, "[%p] SendHypercubeNode on node=%d with start node %d begin {",CmiGetState(), CmiMyNode(), startnode);
1621     for(i=cnt-1; i>=0; i--) {         
1622         int destnode = rp + (1<<i);
1623         if(destnode > CmiNumNodes()-1) continue;
1624         
1625         destnode += startnode;
1626         destnode = destnode % CmiNumNodes();        
1627                         
1628         CmiAssert(destnode != CmiMyNode());
1629
1630 #if CMK_BROADCAST_USE_CMIREFERENCE
1631         CmiReference(msg);
1632         lapiSendFn(CmiNodeFirst(destnode), size, msg, ReleaseMsg, msg, 0);
1633 #else
1634         dupmsg = CopyMsg(msg, size);
1635         lapiSendFn(CmiNodeFirst(destnode), size, dupmsg, ReleaseMsg, dupmsg, 1);
1636 #endif 
1637     }
1638     MACHSTATE3(3, "[%p] SendHypercubeNode on node=%d with start node %d end }",CmiGetState(), CmiMyNode(), startnode);  
1639 }
1640
1641 void CmiSyncNodeBroadcastGeneralFn(int size, char *msg) {    /* ALL_EXCEPT_THIS_NODE  */     
1642 #if ENABLE_CONVERSE_QD
1643     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1644 #endif
1645 #if CMK_BROADCAST_SPANNING_TREE
1646
1647     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use spanning tree) begin {",CmiGetState());
1648     CMI_BROADCAST_ROOT(msg) = -(CmiMyNode()+1);
1649     CMI_MSG_SIZE(msg) = size;
1650     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;    
1651     SendSpanningChildrenNode(size, msg);
1652     
1653 #elif CMK_BROADCAST_HYPERCUBE
1654
1655     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use Hypercube) begin {",CmiGetState());
1656     CMI_BROADCAST_ROOT(msg) = -(CmiMyNode()+1);
1657     CMI_MSG_SIZE(msg) = size;
1658     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;    
1659     SendHypercubeNode(size, msg);
1660     
1661 #else      
1662     char *dupmsg;
1663     int i;
1664     MACHSTATE1(3,"[%p] Sending sync node broadcast message (use p2p) begin {",CmiGetState());
1665
1666     CMI_BROADCAST_ROOT(msg) = 0;
1667     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1668     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {
1669         dupmsg = CopyMsg(msg, size);        
1670         lapiSendFn(CmiNodeFirst(i), size, dupmsg, ReleaseMsg, dupmsg, 1);        
1671     }
1672     for (i=0; i<CmiMyNode(); i++) {
1673         dupmsg = CopyMsg(msg, size);        
1674         lapiSendFn(CmiNodeFirst(i), size, dupmsg, ReleaseMsg, dupmsg, 1);        
1675     }
1676 #endif
1677     MACHSTATE(3,"} Sending sync node broadcast message end");
1678 }
1679
1680 CmiCommHandle CmiAsyncNodeBroadcastGeneralFn(int size, char *msg) {
1681     int i;
1682
1683 #if ENABLE_CONVERSE_QD
1684     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1685 #endif
1686
1687     MACHSTATE1(3,"[%p] Sending async node broadcast message from {",CmiGetState());
1688     CMI_BROADCAST_ROOT(msg) = 0;
1689     CMI_DEST_RANK(msg) =DGRAM_NODEMESSAGE;
1690     void *handle = malloc(sizeof(int));
1691     *((int *)handle) = CmiNumNodes()-1;
1692     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {        
1693         lapiSendFn(CmiNodeFirst(i), size, msg, DeliveredMsg, handle, 0);
1694     }
1695     for (i=0; i<CmiMyNode(); i++) {        
1696         lapiSendFn(CmiNodeFirst(i), size, msg, DeliveredMsg, handle, 0);
1697     }
1698
1699     MACHSTATE(3,"} Sending async broadcast message end");
1700     return handle;
1701 }
1702
1703 void CmiSyncNodeBroadcastFn(int size, char *msg) {
1704     /*CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;*/
1705     CmiSyncNodeBroadcastGeneralFn(size, msg);
1706 }
1707
1708 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
1709     /*CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;*/
1710     return CmiAsyncNodeBroadcastGeneralFn(size, msg);
1711 }
1712
1713 void CmiFreeNodeBroadcastFn(int size, char *msg) {
1714     CmiSyncNodeBroadcastFn(size, msg);
1715     CmiFree(msg);
1716 }
1717
1718 void CmiSyncNodeBroadcastAllFn(int size, char *msg) {
1719     CmiSendNodeSelf(CopyMsg(msg, size));
1720     CmiSyncNodeBroadcastFn(size, msg);
1721 }
1722
1723 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg) {
1724     CmiSendNodeSelf(CopyMsg(msg, size));
1725     return CmiAsyncNodeBroadcastFn(size, msg);
1726 }
1727
1728 void CmiFreeNodeBroadcastAllFn(int size, char *msg) {
1729     CmiSendNodeSelf(CopyMsg(msg, size));
1730     CmiSyncNodeBroadcastFn(size, msg);
1731     CmiFree(msg);
1732 }
1733 #endif
1734
1735 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1736 void CmiSyncListSendFn(int, int *, int, char*) {
1737
1738 }
1739
1740 CmiCommHandle CmiAsyncListSendFn(int, int *, int, char*) {
1741
1742 }
1743
1744 void CmiFreeListSendFn(int, int *, int, char*) {
1745
1746 }
1747 #endif
1748
1749 #if ! CMK_VECTOR_SEND_USES_COMMON_CODE
1750 void CmiSyncVectorSend(int, int, int *, char **) {
1751
1752 }
1753
1754 CmiCommHandle CmiAsyncVectorSend(int, int, int *, char **) {
1755
1756 }
1757
1758 void CmiSyncVectorSendAndFree(int, int, int *, char **) {
1759
1760 }
1761 #endif
1762
1763
1764 /************************** MAIN (non comm related functions) ***********************************/
1765
1766 void ConverseExit(void) {
1767     MACHSTATE2(2, "[%d-%p] entering ConverseExit begin {",CmiMyPe(),CmiGetState());
1768     
1769     /* TODO: Is it necessary to drive the network progress here?? -Chao Mei */
1770
1771     /* A barrier excluding the comm thread if present */
1772     CmiNodeBarrier();
1773
1774 #if CMK_SMP && !CMK_SMP_NO_COMMTHD
1775     /* Signal the comm thd to exit now! */
1776     if(CmiMyRank()==0) {
1777         commThdExit = 1;
1778     }
1779 #endif
1780
1781     ConverseCommonExit();
1782
1783 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1784     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
1785 #endif
1786
1787     CmiNodeBarrier();
1788
1789     MACHSTATE2(2, "[%d-%p] ConverseExit end }",CmiMyPe(),CmiGetState());
1790 #if CMK_SMP
1791     if(CmiMyRank()==0) {
1792         check_lapi(LAPI_Gfence, (lapiContext));
1793         check_lapi(LAPI_Term, (lapiContext));
1794         exit(EXIT_SUCCESS);
1795     }else{
1796         pthread_exit(NULL);
1797     }
1798 #else
1799     check_lapi(LAPI_Gfence, (lapiContext));      
1800     check_lapi(LAPI_Term, (lapiContext));
1801     exit(EXIT_SUCCESS);
1802 #endif
1803 }
1804
1805 /* Those be Cpved?? --Chao Mei */
1806 static char     **Cmi_argv;
1807 static CmiStartFn Cmi_startfn;   /* The start function */
1808 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1809
1810 /** 
1811  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
1812  *  When the proc is idle, the LAPI_Probe is called to make
1813  *  network progress.
1814  *  
1815  *  While in SMP mode, CmiNotifyStillIdle and CmiNotifyBeginIdle
1816  *  are used. Particularly, when idle, the frequency of calling
1817  *  lapi_probe (network progress) is given by "sleepMs"
1818  */
1819 void CmiNotifyIdle(void) {    
1820     AdvanceCommunication();
1821     CmiYield();
1822 }
1823
1824 typedef struct {
1825     int sleepMs; /*Milliseconds to sleep while idle*/
1826     int nIdles; /*Number of times we've been idle in a row*/
1827     CmiState cs; /*Machine state*/
1828 } CmiIdleState;
1829
1830 static CmiIdleState *CmiNotifyGetState(void) {
1831     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1832     s->sleepMs=0;
1833     s->nIdles=0;
1834     s->cs=CmiGetState();
1835     return s;
1836 }
1837
1838 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1839     s->sleepMs=0;
1840     s->nIdles=0;
1841 }
1842
1843 #define SPINS_BEFORE_SLEEP     20
1844
1845 static void CmiNotifyStillIdle(CmiIdleState *s) {
1846     MACHSTATE2(2,"[%p] still idle (%d) begin {",CmiGetState(),CmiMyPe());
1847     s->nIdles++;
1848     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1849         s->sleepMs+=2;
1850         if (s->sleepMs>10) s->sleepMs=10;
1851     }
1852     if (s->sleepMs>0) {
1853         MACHSTATE1(2,"idle sleep (%d) {",CmiMyPe());
1854         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1855         MACHSTATE1(2,"} idle sleep (%d)",CmiMyPe());
1856     }
1857
1858     AdvanceCommunication();
1859     
1860     MACHSTATE1(2,"still idle (%d) end }",CmiMyPe());
1861 }
1862
1863 #if MACHINE_DEBUG_LOG
1864 CpvDeclare(FILE *, debugLog);
1865 #endif
1866
1867
1868 #if ENSURE_MSG_PAIRORDER
1869 static void initMsgOrderInfo(MsgOrderInfo *info){
1870     int i;
1871     info->nextMsgSeqNo = malloc(CmiNumPes()*sizeof(int));
1872     memset(info->nextMsgSeqNo, 0, CmiNumPes()*sizeof(int));
1873     
1874     info->expectedMsgSeqNo = malloc(CmiNumPes()*sizeof(int));
1875     memset(info->expectedMsgSeqNo, 0, CmiNumPes()*sizeof(int));
1876     
1877     info->oooMsgBuffer = malloc(CmiNumPes()*sizeof(void **));
1878     memset(info->oooMsgBuffer, 0, CmiNumPes()*sizeof(void **));
1879     
1880     info->oooMaxOffset = malloc(CmiNumPes()*sizeof(unsigned char));
1881     memset(info->oooMaxOffset, 0, CmiNumPes()*sizeof(unsigned char));
1882
1883     info->CUR_WINDOW_SIZE = malloc(CmiNumPes()*sizeof(unsigned char));
1884     for(i=0; i<CmiNumPes(); i++) info->CUR_WINDOW_SIZE[i] = INIT_WINDOW_SIZE;
1885 }
1886 #endif
1887
1888 /* Only called from communication thread in SMP mode */
1889 static void CommunicationServer(int sleepTime) {
1890 #if CMK_SMP_NO_COMMTHD
1891     sleep(sleepTime);
1892 #else
1893     
1894     if(commThdExit) {
1895         MACHSTATE2(2, "[%d-%p] comm server exit begin {",CmiMyPe(),CmiGetState());
1896         ConverseCommonExit();        
1897         MACHSTATE2(2, "[%d-%p] comm server exit end }",CmiMyPe(),CmiGetState());
1898         pthread_exit(NULL);
1899     }
1900
1901     if(!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
1902
1903 #if CMK_IMMEDIATE_MSG
1904     /*MACHSTATE1(2, "[%p] Handling Immediate Message begin{",CmiGetState());*/
1905     CmiHandleImmediate();
1906     /*MACHSTATE1(2, "[%p] Handling Immediate Message end}",CmiGetState());*/
1907 #endif
1908
1909 #if DECOUPLE_BCAST_PROCESS
1910     /*MACHSTATE1(2, "[%p] Enter ProcessBroadcastMsg begin{",CmiGetState());*/
1911     ProcessBroadcastMsg(0);
1912     /*MACHSTATE1(2, "[%p] Enter ProcessBroadcastMsg end}",CmiGetState());*/
1913 #endif
1914
1915     /* sleep(sleepTime) */
1916 #endif
1917 }
1918
1919 static void ConverseRunPE(int everReturn) {
1920     CmiIdleState *s;
1921     char** CmiMyArgv;
1922     int i;
1923     CpvInitialize(void *,CmiLocalQueue);
1924     CpvInitialize(unsigned, networkProgressCount);
1925
1926     CpvInitialize(PCQueue, procBcastQ);
1927     CpvAccess(procBcastQ) = PCQueueCreate();
1928
1929 #if ENSURE_MSG_PAIRORDER
1930     CpvInitialize(MsgOrderInfo, p2pMsgSeqInfo);
1931     initMsgOrderInfo(&CpvAccess(p2pMsgSeqInfo));
1932
1933     CpvInitialize(MsgOrderInfo, bcastMsgSeqInfo); 
1934     initMsgOrderInfo(&CpvAccess(bcastMsgSeqInfo));    
1935 #endif
1936
1937 #if MACHINE_DEBUG_LOG
1938     {
1939         char ln[200];
1940         sprintf(ln,"debugLog.%d",CmiMyPe());
1941         CpvInitialize(FILE *, debugLog);
1942         CpvAccess(debugLog)=fopen(ln,"w");
1943     }
1944 #endif
1945
1946     /* To make sure cpvaccess is correct? -Chao Mei */
1947     CmiNodeAllBarrier();
1948
1949     /* Added by Chao Mei */
1950 #if CMK_SMP
1951     if(CmiMyRank()) {
1952         /* The master core of this node is already initialized */
1953         lapi_info_t info;
1954         int testnode, testnumnodes;
1955         memset(&info,0,sizeof(info));
1956         /*CpvInitialize(lapi_handle_t *, lapiContext);
1957         CpvAccess(lapiContext) = CpvAccessOther(lapiContext, 0)+CmiMyRank();
1958         CpvAccess(lapiContext) = CpvAccessOther(lapiContext, 0);
1959         */       
1960         MACHSTATE2(2, "My rank id=%d, lapicontext=%p", CmiMyRank(), &lapiContext);
1961
1962         check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, &testnode));
1963         check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, &testnumnodes));
1964
1965         MACHSTATE3(2, "My rank id=%d, Task id=%d, Num tasks=%d", CmiMyRank(), testnode, testnumnodes);
1966     }
1967 #endif
1968
1969
1970     MACHSTATE2(2, "[%d] ConverseRunPE (thread %p)",CmiMyRank(),CmiGetState());
1971         
1972     CmiNodeAllBarrier();
1973
1974     MACHSTATE(2, "After NodeBarrier in ConverseRunPE");
1975     
1976     CpvAccess(CmiLocalQueue) = CmiGetState()->localqueue;
1977
1978     if(CmiMyRank())
1979         CmiMyArgv=CmiCopyArgs(Cmi_argv);
1980     else
1981         CmiMyArgv=Cmi_argv;
1982
1983     CthInit(CmiMyArgv);
1984
1985     MACHSTATE(2, "After CthInit in ConverseRunPE");
1986
1987     ConverseCommonInit(CmiMyArgv);
1988
1989     MACHSTATE(2, "After ConverseCommonInit in ConverseRunPE");
1990
1991    /**
1992      * In SMP, the machine layer usually has one comm thd, and it is 
1993      * designed to be responsible for all network communication. So 
1994      * if there's no dedicated processor for the comm thread, it has 
1995      * to share a proc with a worker thread. In this scenario, 
1996      * the worker thread needs to yield for some time to give CPU 
1997      * time to comm thread. However, in current configuration, we 
1998      * will always dedicate one proc for the comm thd, therefore, 
1999      * such yielding scheme is not necessary.  Besides, avoiding 
2000      * this yielding scheme improves performance because worker 
2001      * thread doesn't need to yield and will be more responsive to 
2002      * incoming messages. So, we will always use CmiNotifyIdle 
2003      * instead. 
2004      *  
2005      * --Chao Mei
2006      */
2007 #if 0 && CMK_SMP
2008     s=CmiNotifyGetState();
2009     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2010     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2011 #else
2012     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2013 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
2014     /* If there's comm thread, then comm thd is responsible for advancing comm */
2015     if(!CsvAccess(lapiInterruptMode)) {
2016         CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn)AdvanceCommunication, NULL);
2017     }
2018 #endif
2019 #endif
2020
2021 #if CMK_IMMEDIATE_MSG
2022     /* Converse initialization finishes, immediate messages can be processed.
2023        node barrier previously should take care of the node synchronization */
2024     _immediateReady = 1;
2025 #endif
2026
2027     /* communication thread */
2028     if (CmiMyRank() == CmiMyNodeSize()) {
2029         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2030         MACHSTATE2(3, "[%p]: Comm thread on node %d is going to be a communication server", CmiGetState(), CmiMyNode());            
2031         while(1) CommunicationServer(5);        
2032     } else { /* worker thread */
2033         if (!everReturn) {
2034             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2035             MACHSTATE1(3, "[%p]: Worker thread is going to work", CmiGetState());
2036             if (Cmi_usrsched==0) CsdScheduler(-1);
2037             ConverseExit();
2038         }
2039     }
2040 }
2041
2042 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
2043     int n,i;
2044
2045     lapi_info_t info;
2046
2047 #if ENSURE_MSG_PAIRORDER
2048    cmplHdlrThdLock = CmiCreateLock();
2049 #endif
2050
2051     /* processor per node */
2052     /** 
2053      *  We have to determin the ppn at this point in order to create
2054      *  the corresponding number of lapiContext instances.
2055      */
2056 #if CMK_SMP
2057     CmiMyNodeSize() = 1;
2058     CmiGetArgInt(argv,"+ppn", &CmiMyNodeSize());
2059 #else
2060     if (CmiGetArgFlag(argv,"+ppn")) {
2061         CmiAbort("+ppn cannot be used in non SMP version!\n");
2062     }
2063 #endif
2064
2065     memset(&info,0,sizeof(info));
2066
2067     /* Register error handler (redundant?) -- added by Chao Mei*/
2068     info.err_hndlr = (LAPI_err_hndlr *)lapi_err_hndlr;
2069
2070     /* Indicates the number of completion handler threads to create */
2071     /* The number of completion hndlr thds will affect the atomic PCQueue operations!! */
2072     /* NOTE: num_compl_hndlr_thr is obsolete now! --Chao Mei */
2073     /* info.num_compl_hndlr_thr = 1; */
2074
2075     check_lapi(LAPI_Init,(&lapiContext, &info));
2076     
2077     /* It's a good idea to start with a fence,
2078        because packets recv'd before a LAPI_Init are just dropped. */
2079     check_lapi(LAPI_Gfence,(lapiContext));
2080
2081     check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, &CmiMyNode()));
2082     check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, &CmiNumNodes()));
2083
2084     /* Make polling as the default mode as real apps have better perf */
2085     CsvAccess(lapiInterruptMode) = 0;
2086     if(CmiGetArgFlag(argv,"+poll")) CsvAccess(lapiInterruptMode) = 0;
2087     if(CmiGetArgFlag(argv,"+nopoll")) CsvAccess(lapiInterruptMode) = 1;    
2088
2089     check_lapi(LAPI_Senv,(lapiContext, ERROR_CHK, lapiDebugMode));
2090     check_lapi(LAPI_Senv,(lapiContext, INTERRUPT_SET, CsvAccess(lapiInterruptMode)));
2091
2092     if(CmiMyNode()==0) {
2093         printf("Running lapi in interrupt mode: %d\n", CsvAccess(lapiInterruptMode));
2094         printf("Running lapi with %d completion handler threads.\n", info.num_compl_hndlr_thr);
2095     }
2096
2097     /** 
2098      *  Associate PumpMsgsBegin with var "lapiHeaderHandler". Then inside Xfer calls,
2099      *  lapiHeaderHandler could be used to indicate the callback
2100      *  instead of PumpMsgsBegin --Chao Mei
2101      */
2102     check_lapi(LAPI_Addr_set,(lapiContext,(void *)PumpMsgsBegin,lapiHeaderHandler));
2103
2104     CmiNumPes() = CmiNumNodes() * CmiMyNodeSize();
2105     Cmi_nodestart = CmiMyNode() * CmiMyNodeSize();
2106
2107     Cmi_argv = argv;
2108     Cmi_startfn = fn;
2109     Cmi_usrsched = usched;
2110
2111     if (CmiGetArgFlag(argv,"++debug")) {  /*Pause so user has a chance to start and attach debugger*/
2112         printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
2113         if (!CmiGetArgFlag(argv,"++debug-no-pause"))
2114             sleep(30);
2115     }
2116
2117     CsvInitialize(CmiNodeState, NodeState);
2118     CmiNodeStateInit(&CsvAccess(NodeState));
2119
2120 #if CMK_NODE_QUEUE_AVAILABLE
2121     CsvInitialize(PCQueue, nodeBcastQ);
2122     CsvAccess(nodeBcastQ) = PCQueueCreate();
2123 #endif
2124
2125     CmiStartThreads(argv);
2126
2127     ConverseRunPE(initret);
2128 }
2129
2130 /***********************************************************************
2131  *
2132  * Abort function:
2133  *
2134  ************************************************************************/
2135
2136 void CmiAbort(const char *message) {
2137     CmiError(message);
2138     LAPI_Term(lapiContext);
2139     exit(1);
2140 }
2141
2142 static void PerrorExit(const char *msg) {
2143     perror(msg);
2144     LAPI_Term(lapiContext);
2145     exit(1);
2146 }
2147