restructure machine-common, split broadcast related
[charm.git] / src / arch / lapi / machine.c
1 /*****************************************************************************
2 LAPI version of machine layer
3 Based on the template machine layer
4
5 Developed by
6 Filippo Gioachin   03/23/05
7 Chao Mei 01/28/2010, 05/07/2011
8 ************************************************************************/
9
10 /** @file
11  * LAPI based machine layer
12  * @ingroup Machine
13  */
14 /*@{*/
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <errno.h>
19 #include <assert.h>
20
21 #include <lapi.h>
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #include <unistd.h> /*For getpid()*/
26 #include <stdlib.h> /*For sleep()*/
27
28 /* Read the following carefully before building the machine layer on LAPI */
29 /* =========BEGIN OF EXPLANATION OF MACRO USAGE=============*/
30 /**
31  * 1. non-SMP mode:
32  *   CMK_SMP = 0;
33  *   CMK_PCQUEUE_LOCK = 1; (could be removed if memory fence and atomic ops are used)
34  *
35  *   (The following two could be disabled to reduce the overhead of machine layer)
36  *     ENSURE_MSG_PAIRORDER = 0|1;
37  *     ENABLE_CONVERSE_QD = 0|1;
38  *
39  *   (If ENSURE_MSG_PAIRORDER is 1, then setting CMK_OFFLOAD_BCAST_PROCESS to 1
40  *   will make the msg seqno increase at step of 1 w/o data race;
41  *     CMK_OFFLOAD_BCAST_PROCESS = 0|1;
42  * ===========================================================
43  * 2. SMP mode without comm thd:
44  *    CMK_SMP = 1;
45  *    CMK_PCQUEUE_LOCK = 1;
46  *    CMK_SMP_NO_COMMTHD = 1;
47  *
48  *    ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
49  *
50  *    CMK_OFFLOAD_BCAST_PROCESS has same options as in non-SMP mode;
51  * ===========================================================
52  *  3. SMP mode with comm thd:
53  *     CMK_SMP = 1;
54  *     CMK_PCQUEUE_LOCK = 1;
55  *     CMK_SMP_NO_COMMTHD = 0;
56  *
57  *     ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
58  *
59  *     (The following must be set with 1 as bcast msg is dealt with in comm thd!)
60  *     CMK_OFFLOAD_BCAST_PROCESS = 1;
61  *  ===========================================================
62  *
63  *  Assumptions we made in different mode:
64  *  1. non-SMP:
65  *     a) imm msgs are processed when the proc is idle, and periodically. They should
66  *        never be processed in the LAPI thread;
67  *     b) forwarding msgs could be done on the proc or in the internal LAPI
68  *        completion handler threads;
69  *  2. SMP w/o comm thd:
70  *     a) same with non-SMP a)
71  *     b) forwarding bcast msgs could be done on proc whose rank=0;
72  *        (enable CMK_OFFLOAD_BCAST_PROCESS)  or in internal LAPI completion
73  *        handler threads;
74  *     c) the destination rank of proc-level bcast msg is always 0;
75  *  3. SMP w/ comm thd:
76  *     a) imm msgs are processed in comm thd;
77  *     b) forwarding bcast msgs is done in comm thd;
78  *     c) same with 2 c)
79  *
80  */
81 /* =========END OF EXPLANATION OF MACRO USAGE=============*/
82
83 #include "machine.h"
84
85 #if CMK_SMP
86 #define CMK_PCQUEUE_LOCK 1
87 #else
88 /**
89  *  In non-smp case: the LAPI completion handler thread will
90  *  also access the proc's recv queue (a PCQueue), so the queue
91  *  needs to be protected. The number of producers equals the
92  *  #completion handler threads, while there's only one consumer
93  *  for the queue. Right now, the #completion handler threads is
94  *  set to 1, so the atomic operation for PCQueue should be
95  *  achieved via memory fence. --Chao Mei
96  */
97
98 /* Redefine CmiNodeLocks only for PCQueue data structure */
99 #define CmiNodeLock CmiNodeLock_nonsmp
100 #undef CmiCreateLock
101 #undef CmiLock
102 #undef CmiUnlock
103 #undef CmiTryLock
104 #undef CmiDestroyLock
105 typedef pthread_mutex_t *CmiNodeLock_nonsmp;
106 CmiNodeLock CmiCreateLock() {
107     CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
108     pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
109     return lk;
110 }
111 #define CmiLock(lock) (pthread_mutex_lock(lock))
112 #define CmiUnlock(lock) (pthread_mutex_unlock(lock))
113 #define CmiTryLock(lock) (pthread_mutex_trylock(lock))
114 void CmiDestroyLock(CmiNodeLock lock) {
115     pthread_mutex_destroy(lock);
116     free(lock);
117 }
118 #define CMK_PCQUEUE_LOCK 1
119 #endif
120 #include "pcqueue.h"
121
122 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
123 /**
124  * #####REGARDING IN-ORDER MESSAGE DELIVERY BETWEEN A PAIR OF
125  * PROCESSORS#####:
126  *
127  * Since the lapi doesn't guarantee the order of msg delivery
128  * (via network) between a pair of processors, we need to ensure
129  * this order via msg seq no and a window sliding based msg
130  * receiving scheme. So two extra fields are added to the basic
131  * msg header: srcPe and seqno. For node messages, we process it
132  * like a msg to be delivered to the first proc (rank=0) of that
133  * node.
134  *
135  * BTW: The in-order delivery between two processors in the same
136  * node is guaranteed in the SMP mode as the msg transfer
137  * doesn't go through LAPI.
138  *
139  * The msg transferred through LAPI (via network) is always
140  * delivered to the first proc (whose rank is 0) on that node!
141  *
142  * --Chao Mei
143  */
144 #define ENSURE_MSG_PAIRORDER 0
145
146 #if ENSURE_MSG_PAIRORDER
147
148 /* NOTE: this feature requires O(P) space on every proc to be functional */
149
150 #define MAX_MSG_SEQNO 65535
151 /* MAX_WINDOW_SIZE should be smaller than MAX_MSG_SEQNO, and MAX(unsigned char) */
152 #define MAX_WINDOW_SIZE 128
153 #define INIT_WINDOW_SIZE 8
154
155 /* The lock to ensure the completion handler (PumpMsgsComplete) is thread-safe */
156 CmiNodeLock cmplHdlrThdLock = NULL;
157
158 /**
159  *  expectedMsgSeqNo is an int array of size "#procs". It tracks
160  *  the expected seqno recved from other procs to this proc.
161  *
162  *  nextMsgSeqNo is an int array of size "#procs". It tracks
163  *  the next seqno of the msg to be sent from this proc to other
164  *  procs.
165  *
166  *  oooMsgBuffer is an array of sizeof(void **)*(#procs), each
167  * element (created on demand) points to a window (array) size
168  * of CUR_WINDOW_SIZE which buffers the out-of-order incoming
169  * messages (a (void *) array)
170  *
171  * oooMaxOffset indicates the maximum offset of the ooo msg
172  * ahead of the expected msg. The offset begins with 1, i.e.,
173  * (offset-1) is the index of the ooo msg in the window
174  * (oooMsgBuffer)
175  *
176  *  --Chao Mei
177  */
178
179 typedef struct MsgOrderInfoStruct {
180     /* vars used on sender side */
181     int *nextMsgSeqNo;
182
183     /* vars used on recv side */
184     int *expectedMsgSeqNo;
185     void ***oooMsgBuffer;
186     unsigned char *oooMaxOffset;
187     unsigned char *CUR_WINDOW_SIZE;
188 } MsgOrderInfo;
189
190 /**
191  * Once p2p msgs are ensured in-order delivery for a pair
192  * of procs, then the bcast msg is guaranteed correspondently
193  * as the order of msgs sent is fixed (i.e. the spanning tree
194  * or the hypercube is fixed)
195  */
196 CpvDeclare(MsgOrderInfo, p2pMsgSeqInfo);
197 #endif
198
199 /**
200  *  Enable this macro will offload the broadcast relay from the
201  *  internal completion handler thread. This will make the msg
202  *  seqno free of data-race. In SMP mode with comm thread where
203  *  comm thread will forward bcast msgs, this macro should be
204  *  enabled.
205  */
206 /* TODO: need re-consideration */
207 #define CMK_OFFLOAD_BCAST_PROCESS 1
208 /* When ENSURE_MSG_PAIRORDER is enabled, CMK_OFFLOAD_BCAST_PROCESS
209  * requires to be defined because if the bcast message is processed
210  * in the lapi completion handler (the internal lapi thread), then
211  * there's possibility of data races in setting the sequence number.
212  * In SMP mode, the bcast forwarding should be offloaded from the
213  * completion handler to the comm thread to reduce the overhead if
214  * there's comm thread. If there's no commthread, and if cpv is
215  * tls-based, then CMK_OFFLOAD_BCAST_PROCESS also requires enabled
216  * because there's charm proc-private variable access would be
217  * incorrect in lapi's internal threads. -Chao Mei
218  */
219 #if (CMK_SMP && (!CMK_SMP_NO_COMMTHD || (CMK_TLS_THREAD && !CMK_NOT_USE_TLS_THREAD))) || ENSURE_MSG_PAIRORDER
220 #undef CMK_OFFLOAD_BCAST_PROCESS
221 #define CMK_OFFLOAD_BCAST_PROCESS 1
222 #endif
223
224 #if CMK_OFFLOAD_BCAST_PROCESS
225 CsvDeclare(PCQueue, procBcastQ);
226 #if CMK_NODE_QUEUE_AVAILABLE
227 CsvDeclare(PCQueue, nodeBcastQ);
228 #endif
229 #endif
230
231 /* =======End of Definitions of Performance-Specific Macros =======*/
232
233
234 /* =======Beginning of Definitions of Msg Header Specific Macros =======*/
235 /* msg size and srcpe are required info as they will be used for forwarding
236  * bcast msg and for ensuring message ordering
237  */
238 #define CMI_MSG_SRCPE(msg)               ((CmiMsgHeaderBasic *)msg)->srcpe
239 #define CMI_MSG_SEQNO(msg)               ((CmiMsgHeaderBasic *)msg)->seqno
240 /* =======End of Definitions of Msg Header Specific Macros =======*/
241
242 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
243 /* =====End of Definitions of Message-Corruption Related Macros=====*/
244
245
246 /* =====Beginning of Declarations of Machine Specific Variables===== */
247
248 static int lapiDebugMode=0;
249 CsvDeclare(int, lapiInterruptMode);
250
251 /**
252  * The lapiContext stands for the lapi context for a single lapi
253  * task. And inside one lapi task, only one lapi context could
254  * be created via lapi_init. In SMP mode, this context is
255  * created by proc of rank 0, and then it is shared among all
256  * cores on a node (threads) --Chao Mei
257  *
258  */
259 static lapi_handle_t lapiContext;
260 static lapi_long_t lapiHeaderHandler = 1;
261 /* =====End of Declarations of Machine Specific Variables===== */
262
263
264 /* =====Beginning of Declarations of Machine Specific Functions===== */
265
266 /* The way to read this macro
267  * "routine args" becomes a function call inside the parameters of check_lapi_err,
268  * and it returns a int as returnCode;
269  * "#routine" turns the "routine" as a string
270  * __LINE__ is the line number in the source file
271  * -Chao Mei
272  */
273 #define check_lapi(routine,args) \
274         check_lapi_err(routine args, #routine, __LINE__);
275
276 static void check_lapi_err(int returnCode,const char *routine,int line) {
277     if (returnCode!=LAPI_SUCCESS) {
278         char errMsg[LAPI_MAX_ERR_STRING];
279         LAPI_Msg_string(returnCode,errMsg);
280         fprintf(stderr,"Fatal LAPI error while executing %s at %s:%d\n"
281                 "  Description: %s\n", routine, __FILE__, line, errMsg);
282         CmiAbort("Fatal LAPI error");
283     }
284 }
285
286 static void lapi_err_hndlr(lapi_handle_t *hndl, int *error_code,
287                            lapi_err_t *err_type, int *task_ID, int *src) {
288     char errstr[LAPI_MAX_ERR_STRING];
289     LAPI_Msg_string(*error_code, errstr);
290     fprintf(stderr, "ERROR IN LAPI: %s for task %d at src %d\n", errstr, *task_ID, *src);
291     LAPI_Term(*hndl);
292     exit(1);
293 }
294
295 /* ===== Beginging of functions regarding ensure in-order msg delivery ===== */
296 #if ENSURE_MSG_PAIRORDER
297
298 /**
299  * "setNextMsgSeqNo" actually sets the current seqno, the
300  * "getNextMsgSeqNo" will increment the seqno, i.e.,
301  * "getNextMsgSeqNo" returns the next seqno based on the previous
302  * seqno stored in the seqno array.
303  * --Chao Mei
304  */
305 static int getNextMsgSeqNo(int *seqNoArr, int destPe) {
306     int ret = seqNoArr[destPe];
307     ret++;
308     return ret;
309 }
310 static void setNextMsgSeqNo(int *seqNoArr, int destPe, int val) {
311     /* the seq no. may fast-forward to a new round (i.e., starting from 1 again!) */
312     if (val>=MAX_MSG_SEQNO) val -= MAX_MSG_SEQNO;
313     seqNoArr[destPe] = val;
314 }
315
316 #define getNextExpectedMsgSeqNo(seqNoArr,pe) getNextMsgSeqNo(seqNoArr, pe)
317 #define setNextExpectedMsgSeqNo(seqNoArr,pe,val) setNextMsgSeqNo(seqNoArr, pe, val)
318
319 static int checkMsgInOrder(char *msg, MsgOrderInfo *info);
320 #endif
321 /* ===== End of functions regarding ensure in-order msg delivery ===== */
322
323
324 /* The machine-specific send function */
325 static CmiCommHandle MachineSendFuncForLAPI(int destNode, int size, char *msg, int mode);
326 #define LrtsSendFunc MachineSendFuncForLAPI
327
328 /* ### Beginning of Machine-startup Related Functions ### */
329 static void MachineInitForLAPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
330 #define LrtsInit MachineInitForLAPI
331
332 static void MachinePreCommonInitForLAPI(int everReturn);
333 static void MachinePostCommonInitForLAPI(int everReturn);
334 #define LrtsPreCommonInit MachinePreCommonInitForLAPI
335 #define LrtsPostCommonInit MachinePostCommonInitForLAPI
336 /* ### End of Machine-startup Related Functions ### */
337
338 /* ### Beginning of Machine-running Related Functions ### */
339 static void AdvanceCommunicationForLAPI();
340 #define LrtsAdvanceCommunication AdvanceCommunicationForLAPI
341
342 static void DrainResourcesForLAPI(); /* used when exit */
343 #define LrtsDrainResources DrainResourcesForLAPI
344
345 static void MachineExitForLAPI();
346 #define LrtsExit MachineExitForLAPI
347 /* ### End of Machine-running Related Functions ### */
348
349 /* ### Beginning of Idle-state Related Functions ### */
350 /* ### End of Idle-state Related Functions ### */
351
352 static void MachinePostNonLocalForLAPI();
353 #define LrtsPostNonLocal MachinePostNonLocalForLAPI
354
355 /* =====End of Declarations of Machine Specific Functions===== */
356
357 /**
358  *  Macros that overwrites the common codes, such as
359  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
360  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
361  *  CMK_OFFLOAD_BCAST_PROCESS etc.
362  */
363 /* For async msg sending ops, using lapi specific implementations */
364 #define USE_COMMON_ASYNC_BCAST 0
365 #define CMK_OFFLOAD_BCAST_PROCESS 1
366 #include "machine-lrts.h"
367 #include "machine-common-core.c"
368
369 /* The machine specific msg-sending function */
370
371 /* ######Beginning of functions for sending a msg ###### */
372 //lapi sending completion callback
373 /* The following two are callbacks for sync and async send respectively */
374 static void ReleaseMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
375     MACHSTATE2(2,"[%d] ReleaseMsg begin %p {",CmiMyNode(),msg);
376     check_lapi_err(info->reason, "ReleaseMsg", __LINE__);
377     CmiFree(msg);
378     MACHSTATE(2,"} ReleaseMsg end");
379 }
380
381 static void DeliveredMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
382     MACHSTATE1(2,"[%d] DeliveredMsg begin {",CmiMyNode());
383     check_lapi_err(info->reason, "DeliveredMsg", __LINE__);
384     *((int *)msg) = *((int *)msg) - 1;
385     MACHSTATE(2,"} DeliveredMsg end");
386 }
387
388 static INLINE_KEYWORD void lapiSendFn(int destNode, int size, char *msg, scompl_hndlr_t *shdlr, void *sinfo) {
389     lapi_xfer_t xfer_cmd;
390
391     MACHSTATE3(2,"lapiSendFn to destNode=%d with msg %p (isImm=%d) begin {",destNode,msg, CmiIsImmediate(msg));
392     MACHSTATE3(2, "inside lapiSendFn 1: size=%d, sinfo=%p, deliverable=%d", size, sinfo, deliverable);
393
394     MACHSTATE2(2, "Ready to call LAPI_Xfer with destNode=%d, destRank=%d",destNode,CMI_DEST_RANK(msg));
395
396     xfer_cmd.Am.Xfer_type = LAPI_AM_XFER;
397     xfer_cmd.Am.flags     = 0;
398     xfer_cmd.Am.tgt       = destNode;
399     xfer_cmd.Am.hdr_hdl   = lapiHeaderHandler;
400     xfer_cmd.Am.uhdr_len  = 0;
401     xfer_cmd.Am.uhdr      = NULL;
402     xfer_cmd.Am.udata     = msg;
403     xfer_cmd.Am.udata_len = size;
404     xfer_cmd.Am.shdlr     = shdlr;
405     xfer_cmd.Am.sinfo     = sinfo;
406     xfer_cmd.Am.tgt_cntr  = NULL;
407     xfer_cmd.Am.org_cntr  = NULL;
408     xfer_cmd.Am.cmpl_cntr = NULL;
409
410     check_lapi(LAPI_Xfer,(lapiContext, &xfer_cmd));
411
412     MACHSTATE(2,"} lapiSendFn end");
413 }
414
415 static CmiCommHandle MachineSendFuncForLAPI(int destNode, int size, char *msg, int mode) {
416     scompl_hndlr_t *shdlr = NULL;
417     void *sinfo = NULL;
418
419     if (mode==P2P_SYNC) {
420         shdlr = ReleaseMsg;
421         sinfo = (void *)msg;
422     } else if (mode==P2P_ASYNC) {
423         shdlr = DeliveredMsg;
424         sinfo = malloc(sizeof(int));
425         *((int *)sinfo) = 1;
426     }
427
428     CMI_MSG_SIZE(msg) = size;
429
430 #if ENSURE_MSG_PAIRORDER
431 #if CMK_NODE_QUEUE_AVAILABLE
432     if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE) {
433         lapiSendFn(destNode, size, msg, shdlr, sinfo);
434         return sinfo;
435     }
436 #endif
437     int destPE = CmiNodeFirst(destNode)+CMI_DEST_RANK(msg);
438     CMI_MSG_SRCPE(msg) = CmiMyPe();
439     /* Note: This could be executed on comm threads, where CmiMyPe() >= CmiNumPes() */
440     CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
441     setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
442 #endif
443
444     lapiSendFn(destNode, size, msg, shdlr, sinfo);
445     return sinfo;
446 }
447
448 /* Lapi-specific implementation of async msg sending operations */
449 #if !USE_COMMON_ASYNC_BCAST
450 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
451 #if ENSURE_MSG_PAIRORDER
452     /* Not sure how to add the msg seq no for async broadcast messages --Chao Mei */
453     /* so abort here ! */
454     CmiAssert(0);
455     return 0;
456 #else
457     int i, rank;
458     int mype = CmiMyPe();
459 #if ENABLE_CONVERSE_QD
460     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
461 #endif
462     MACHSTATE1(3,"[%d] Sending async broadcast message from {",CmiMyNode());
463     CMI_BROADCAST_ROOT(msg) = 0;
464     void *handle = malloc(sizeof(int));
465     *((int *)handle) = CmiNumPes()-1;
466
467     for (i=mype+1; i<CmiNumPes(); i++) {
468         CMI_DEST_RANK(msg) = CmiRankOf(i);
469         lapiSendFn(CmiNodeOf(i), size, msg, DeliveredMsg, handle);
470     }
471     for (i=0; i<mype; i++) {
472         CMI_DEST_RANK(msg) = CmiRankOf(i);
473         lapiSendFn(CmiNodeOf(i), size, msg, DeliveredMsg, handle);
474     }
475
476     MACHSTATE(3,"} Sending async broadcast message end");
477     return handle;
478 #endif
479 }
480
481 #if CMK_NODE_QUEUE_AVAILABLE
482 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
483     int i;
484
485 #if ENABLE_CONVERSE_QD
486     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
487 #endif
488
489     MACHSTATE1(3,"[%d] Sending async node broadcast message from {",CmiMyNode());
490     CMI_BROADCAST_ROOT(msg) = 0;
491     CMI_DEST_RANK(msg) =DGRAM_NODEMESSAGE;
492     void *handle = malloc(sizeof(int));
493     *((int *)handle) = CmiNumNodes()-1;
494     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {
495         lapiSendFn(i, size, msg, DeliveredMsg, handle);
496     }
497     for (i=0; i<CmiMyNode(); i++) {
498         lapiSendFn(i, size, msg, DeliveredMsg, handle);
499     }
500
501     MACHSTATE(3,"} Sending async broadcast message end");
502     return handle;
503 }
504 #endif
505 #endif/* end of !USE_COMMON_ASYNC_BCAST */
506
507 int CmiAsyncMsgSent(CmiCommHandle handle) {
508     return (*((int *)handle) == 0)?1:0;
509 }
510
511 void CmiReleaseCommHandle(CmiCommHandle handle) {
512 #ifndef CMK_OPTIMIZE
513     if (*((int *)handle) != 0) CmiAbort("Released a CmiCommHandle not free!");
514 #endif
515     free(handle);
516 }
517 /* ######End of functions for sending a msg ###### */
518
519 /* ######Beginning of functions for receiving a msg ###### */
520 /* lapi recv callback when the first packet of the msg arrives as header handler*/
521 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
522                            void *hdr, uint *uhdr_len,
523                            lapi_return_info_t *msg_info,
524                            compl_hndlr_t **comp_h, void **comp_am_info);
525 /* lapi recv completion callback when all the msg is received */
526 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info);
527
528 /** lapi header handler: executed on the recv side, when the
529  *  first packet of the recving msg arrives, it is called to
530  *  prepare the memory buffer in the user space for recving the
531  *  data --Chao Mei
532  */
533 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
534                            void *hdr, uint *uhdr_len,
535                            lapi_return_info_t *msg_info,
536                            compl_hndlr_t **comp_h, void **comp_am_info) {
537     void *msg_buf;
538     MACHSTATE1(2,"[%d] PumpMsgsBegin begin {",CmiMyNode());
539     /* prepare the space for receiving the data, set the completion handler to
540        be executed inline */
541     msg_buf = (void *)CmiAlloc(msg_info->msg_len);
542
543     msg_info->ret_flags = LAPI_SEND_REPLY;
544     *comp_h = PumpMsgsComplete;
545     *comp_am_info = msg_buf;
546     MACHSTATE(2,"} PumpMsgsBegin end");
547     return msg_buf;
548
549 }
550
551 /**
552   * lapi completion handler on the recv side. It's responsible to push messages
553   * to the destination proc or relay broadcast messages. --Chao Mei
554   *
555   * Note: The completion handler could be executed on any cores within a node ???
556   * So in SMP mode when there's a comm thread, the completion handler should be carefully
557   * dealt with.
558   *
559   * Given lapi also provides an internal lapi thread to deal with network progress which
560   * will call this function (???), we should be careful with the following situations:
561   * 1) non SMP mode, with interrupt (lapi internal completion thread)
562   * 2) non SMP mode, with polling (machine layer is responsible for network progress)
563   * 3) SMP mode, no comm thread, with polling
564   * 4) SMP mode, no comm thread, with interrupt
565   * 5) SMP mode, with comm thread, with polling (not yet implemented, comm server is empty right now)
566   * 6) SMP mode, with comm thread, with interrupt??
567   *
568   * Currently, SMP mode without comm thread is undergoing implementation.
569   *
570   * This function is executed by LAPI internal threads. It seems that the number of internal
571   * completion handler threads could vary during the program. LAPI adaptively creates more
572   * threads if there are more outstanding messages!!!! This means pcqueue needs protection
573   * even in the nonsmp case!!!!
574   *
575   * --Chao Mei
576   */
577 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info) {
578     int i;
579     char *msg = am_info;
580     int broot, destrank;
581
582     MACHSTATE3(2,"[%d] PumpMsgsComplete with msg %p (isImm=%d) begin {",CmiMyNode(), msg, CmiIsImmediate(msg));
583 #if ENSURE_MSG_PAIRORDER
584     MACHSTATE3(2,"msg %p info: srcpe=%d, seqno=%d", msg, CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
585 #endif
586     /**
587      * First, we check if the msg is a broadcast msg via spanning
588      * tree. If it is, it needs to call SendSpanningChildren to
589      * relay the broadcast, and then send the msg to every cores on
590      * this node.
591      *
592      * After the first check, we deal with normal messages.
593      * --Chao Mei
594      */
595     /* It's the right place to relay the broadcast message */
596     /**
597      * 1. For in-order delivery, because this is the handler for
598      * receiving a message, and we assume the cross-network msgs are
599      * always delivered to the first proc (rank 0) of this node, we
600      * select the srcpe of the bcast msgs and the next msg seq no
601      * correspondingly.
602      *
603      * --Chao Mei
604      */
605 #if ENSURE_MSG_PAIRORDER
606     broot = CMI_BROADCAST_ROOT(msg);
607     destrank = CMI_DEST_RANK(msg);
608     /* Only check proc-level msgs */
609     if (broot>=0
610 #if CMK_NODE_QUEUE_AVAILABLE
611             && destrank != DGRAM_NODEMESSAGE
612 #endif
613        ) {
614         MsgOrderInfo *info;
615         info = &CpvAccessOther(p2pMsgSeqInfo, destrank);
616         MACHSTATE1(2, "Check msg in-order for p2p msg %p", msg);
617
618         if (checkMsgInOrder(msg,info)) {
619             MACHSTATE(2,"} PumpMsgsComplete end ");
620             return;
621         }
622     }
623 #endif
624
625     handleOneRecvedMsg(CMI_MSG_SIZE(msg), msg);
626
627     MACHSTATE(2,"} PumpMsgsComplete end ");
628     return;
629 }
630
631 /* utility function for ensuring the message pair-ordering */
632 #if ENSURE_MSG_PAIRORDER
633 /* return 1 if this msg is an out-of-order incoming message */
634 /**
635  * Returns 1 if this "msg" is an out-of-order message, or
636  * this "msg" is a late message which triggers the process
637  * of all buffered ooo msgs.
638  * --Chao Mei
639  */
640 static int checkMsgInOrder(char *msg, MsgOrderInfo *info) {
641     int srcpe, destrank;
642     int incomingSeqNo, expectedSeqNo;
643     int curOffset, maxOffset;
644     int i, curWinSize;
645     void **destMsgBuffer = NULL;
646
647     /* numMsg is the number of msgs to be processed in this buffer*/
648     /* Reason to have this extra copy of msgs to be processed: Reduce the atomic granularity */
649     void **toProcessMsgBuffer;
650     int numMsgs = 0;
651
652     srcpe = CMI_MSG_SRCPE(msg);
653     destrank = CMI_DEST_RANK(msg);
654     incomingSeqNo = CMI_MSG_SEQNO(msg);
655
656     CmiLock(cmplHdlrThdLock);
657
658     expectedSeqNo = getNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe);
659     if (expectedSeqNo == incomingSeqNo) {
660         /* Two cases: has ooo msg buffered or not */
661         maxOffset = (info->oooMaxOffset)[srcpe];
662         if (maxOffset>0) {
663             MACHSTATE1(4, "Processing all buffered ooo msgs (maxOffset=%d) including the just recved begin {", maxOffset);
664             curWinSize = info->CUR_WINDOW_SIZE[srcpe];
665             toProcessMsgBuffer = malloc((curWinSize+1)*sizeof(void *));
666             /* process the msg just recved */
667             toProcessMsgBuffer[numMsgs++] = msg;
668             /* process the buffered ooo msg until the first empty slot in the window */
669             destMsgBuffer = (info->oooMsgBuffer)[srcpe];
670             for (curOffset=0; curOffset<maxOffset; curOffset++) {
671                 char *curMsg = destMsgBuffer[curOffset];
672                 if (curMsg == NULL) {
673                     CmiAssert(curOffset!=(maxOffset-1));
674                     break;
675                 }
676                 toProcessMsgBuffer[numMsgs++] = curMsg;
677                 destMsgBuffer[curOffset] = NULL;
678             }
679             /* Update expected seqno, maxOffset and slide the window */
680             if (curOffset < maxOffset) {
681                 int i;
682                 /**
683                  * now, the seqno of the next to-be-recved msg should be
684                  * "expectedSeqNo+curOffset+1" as the seqno of the just
685                  * processed msg is "expectedSeqNo+curOffset. We need to slide
686                  * the msg buffer window from "curOffset+1" because the first
687                  * element of the buffer window should always points to the ooo
688                  * msg that's 1 in terms of seqno ahead of the next to-be-recved
689                  * msg. --Chao Mei
690                  */
691
692                 /* moving [curOffset+1, maxOffset) to [0, maxOffset-curOffset-1) in the window */
693                 /* The following two loops could be combined --Chao Mei */
694                 for (i=0; i<maxOffset-curOffset-1; i++) {
695                     destMsgBuffer[i] = destMsgBuffer[curOffset+i+1];
696                 }
697                 for (i=maxOffset-curOffset-1; i<maxOffset; i++) {
698                     destMsgBuffer[i] = NULL;
699                 }
700                 (info->oooMaxOffset)[srcpe] = maxOffset-curOffset-1;
701                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+curOffset);
702             } else {
703                 /* there's no remaining buffered ooo msgs */
704                 (info->oooMaxOffset)[srcpe] = 0;
705                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+maxOffset);
706             }
707
708             CmiUnlock(cmplHdlrThdLock);
709
710             /* Process the msgs */
711             for (i=0; i<numMsgs; i++) {
712                 char *curMsg = toProcessMsgBuffer[i];
713                 if (CMI_BROADCAST_ROOT(curMsg)>0) {
714
715 #if CMK_OFFLOAD_BCAST_PROCESS
716                     PCQueuePush(CsvAccess(procBcastQ), curMsg);
717 #else
718                     processProcBcastMsg(CMI_MSG_SIZE(curMsg), curMsg);
719 #endif
720                 } else {
721                     CmiPushPE(CMI_DEST_RANK(curMsg), curMsg);
722                 }
723             }
724
725             free(toProcessMsgBuffer);
726
727             MACHSTATE1(4, "Processing all buffered ooo msgs (actually processed %d) end }", curOffset);
728             /**
729              * Since we have processed all buffered ooo msgs including
730              * this just recved one, 1 should be returned so that this
731              * msg no longer needs processing
732              */
733             return 1;
734         } else {
735             /* An expected msg recved without any ooo msg buffered */
736             MACHSTATE1(4, "Receiving an expected msg with seqno=%d\n", incomingSeqNo);
737             setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo);
738
739             CmiUnlock(cmplHdlrThdLock);
740             return 0;
741         }
742     }
743
744     MACHSTATE2(4, "Receiving an out-of-order msg with seqno=%d, but expect seqno=%d", incomingSeqNo, expectedSeqNo);
745     curWinSize = info->CUR_WINDOW_SIZE[srcpe];
746     if ((info->oooMsgBuffer)[srcpe]==NULL) {
747         (info->oooMsgBuffer)[srcpe] = malloc(curWinSize*sizeof(void *));
748         memset((info->oooMsgBuffer)[srcpe], 0, curWinSize*sizeof(void *));
749     }
750     destMsgBuffer = (info->oooMsgBuffer)[srcpe];
751     curOffset = incomingSeqNo - expectedSeqNo;
752     maxOffset = (info->oooMaxOffset)[srcpe];
753     if (curOffset<0) {
754         /* It's possible that the seqNo starts with another round (exceeding MAX_MSG_SEQNO) with 1 */
755         curOffset += MAX_MSG_SEQNO;
756     }
757     if (curOffset > curWinSize) {
758         int newWinSize;
759         if (curOffset > MAX_WINDOW_SIZE) {
760             CmiAbort("Exceeding the MAX_WINDOW_SIZE!\n");
761         }
762         newWinSize = ((curOffset/curWinSize)+1)*curWinSize;
763         /*CmiPrintf("[%d]: WARNING: INCREASING WINDOW SIZE FROM %d TO %d\n", CmiMyPe(), curWinSize, newWinSize);*/
764         (info->oooMsgBuffer)[srcpe] = malloc(newWinSize*sizeof(void *));
765         memset((info->oooMsgBuffer)[srcpe], 0, newWinSize*sizeof(void *));
766         memcpy((info->oooMsgBuffer)[srcpe], destMsgBuffer, curWinSize*sizeof(void *));
767         info->CUR_WINDOW_SIZE[srcpe] = newWinSize;
768         free(destMsgBuffer);
769         destMsgBuffer = (info->oooMsgBuffer)[srcpe];
770     }
771     CmiAssert(destMsgBuffer[curOffset-1] == NULL);
772     destMsgBuffer[curOffset-1] = msg;
773     if (curOffset > maxOffset) (info->oooMaxOffset)[srcpe] = curOffset;
774
775     CmiUnlock(cmplHdlrThdLock);
776     return 1;
777 }
778 #endif /* end of ENSURE_MSG_PAIRORDER */
779
780 /* ######End of functions for receiving a msg ###### */
781
782 /* ######Beginning of functions related with communication progress ###### */
783
784 static INLINE_KEYWORD void AdvanceCommunicationForLAPI() {
785     /* What about CMK_SMP_NO_COMMTHD in the original implementation?? */
786     /* It does nothing but sleep */
787     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
788 }
789 /* ######End of functions related with communication progress ###### */
790
791 static void MachinePostNonLocalForLAPI() {
792     /* None here */
793 }
794
795 /**
796  * TODO: What will be the effects if calling LAPI_Probe in the
797  * interrupt mode??? --Chao Mei
798  */
799 /* Network progress function is used to poll the network when for
800    messages. This flushes receive buffers on some  implementations*/
801 #if CMK_MACHINE_PROGRESS_DEFINED
802 void CmiMachineProgressImpl() {
803     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
804
805 #if CMK_IMMEDIATE_MSG
806     MACHSTATE1(2, "[%d] Handling Immediate Message begin {",CmiMyNode());
807     CmiHandleImmediate();
808     MACHSTATE1(2, "[%d] Handling Immediate Message end }",CmiMyNode());
809 #endif
810
811 #if CMK_SMP && !CMK_SMP_NO_COMMTHD && CMK_OFFLOAD_BCAST_PROCESS
812     if (CmiMyRank()==CmiMyNodeSize()) processBcastQs(); /* FIXME ????????????????*/
813 #endif
814 }
815 #endif
816
817 /* ######Beginning of functions related with exiting programs###### */
818 void DrainResourcesForLAPI() {
819     /* None here */
820 }
821
822 void MachineExitForLAPI(void) {
823     check_lapi(LAPI_Gfence, (lapiContext));
824     check_lapi(LAPI_Term, (lapiContext));
825     exit(EXIT_SUCCESS);
826 }
827 /* ######End of functions related with exiting programs###### */
828
829
830 /* ######Beginning of functions related with starting programs###### */
831 /**
832  *  Obtain the number of nodes, my node id, and consuming machine layer
833  *  specific arguments
834  */
835 static void MachineInitForLAPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
836
837     lapi_info_t info;
838     char **largv = *argv;
839
840     memset(&info,0,sizeof(info));
841
842     /* Register error handler (redundant?) -- added by Chao Mei*/
843     info.err_hndlr = (LAPI_err_hndlr *)lapi_err_hndlr;
844
845     /* Indicates the number of completion handler threads to create */
846     /* The number of completion hndlr thds will affect the atomic PCQueue operations!! */
847     /* NOTE: num_compl_hndlr_thr is obsolete now! --Chao Mei */
848     /* info.num_compl_hndlr_thr = 1; */
849
850     check_lapi(LAPI_Init,(&lapiContext, &info));
851
852     /* It's a good idea to start with a fence,
853        because packets recv'd before a LAPI_Init are just dropped. */
854     check_lapi(LAPI_Gfence,(lapiContext));
855
856     check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, myNodeID));
857     check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, numNodes));
858
859     /* Make polling as the default mode as real apps have better perf */
860     CsvAccess(lapiInterruptMode) = 0;
861     if (CmiGetArgFlag(largv,"+poll")) CsvAccess(lapiInterruptMode) = 0;
862     if (CmiGetArgFlag(largv,"+nopoll")) CsvAccess(lapiInterruptMode) = 1;
863
864     check_lapi(LAPI_Senv,(lapiContext, ERROR_CHK, lapiDebugMode));
865     check_lapi(LAPI_Senv,(lapiContext, INTERRUPT_SET, CsvAccess(lapiInterruptMode)));
866
867     if (*myNodeID == 0) {
868         printf("Running lapi in interrupt mode: %d\n", CsvAccess(lapiInterruptMode));
869         printf("Running lapi with %d completion handler threads.\n", info.num_compl_hndlr_thr);
870     }
871
872     /**
873      *  Associate PumpMsgsBegin with var "lapiHeaderHandler". Then inside Xfer calls,
874      *  lapiHeaderHandler could be used to indicate the callback
875      *  instead of PumpMsgsBegin --Chao Mei
876      */
877     check_lapi(LAPI_Addr_set,(lapiContext,(void *)PumpMsgsBegin,lapiHeaderHandler));
878
879     if (CmiGetArgFlag(largv,"++debug")) {  /*Pause so user has a chance to start and attach debugger*/
880         printf("CHARMDEBUG> Processor %d has PID %d\n",*myNodeID,getpid());
881         if (!CmiGetArgFlag(largv,"++debug-no-pause"))
882             sleep(30);
883     }
884
885 #if ENSURE_MSG_PAIRORDER
886     cmplHdlrThdLock = CmiCreateLock();
887 #endif
888 }
889
890 #if MACHINE_DEBUG_LOG
891 CpvDeclare(FILE *, debugLog);
892 #endif
893
894 #if ENSURE_MSG_PAIRORDER
895 static void initMsgOrderInfo(MsgOrderInfo *info) {
896     int i;
897     int totalPEs = CmiNumPes();
898 #if CMK_SMP && CMK_OFFLOAD_BCAST_PROCESS
899     /* the comm thread will also access such info */
900     totalPEs += CmiNumNodes();
901 #endif
902     info->nextMsgSeqNo = malloc(totalPEs*sizeof(int));
903     memset(info->nextMsgSeqNo, 0, totalPEs*sizeof(int));
904
905     info->expectedMsgSeqNo = malloc(totalPEs*sizeof(int));
906     memset(info->expectedMsgSeqNo, 0, totalPEs*sizeof(int));
907
908     info->oooMsgBuffer = malloc(totalPEs*sizeof(void **));
909     memset(info->oooMsgBuffer, 0, totalPEs*sizeof(void **));
910
911     info->oooMaxOffset = malloc(totalPEs*sizeof(unsigned char));
912     memset(info->oooMaxOffset, 0, totalPEs*sizeof(unsigned char));
913
914     info->CUR_WINDOW_SIZE = malloc(totalPEs*sizeof(unsigned char));
915     for (i=0; i<totalPEs; i++) info->CUR_WINDOW_SIZE[i] = INIT_WINDOW_SIZE;
916 }
917 #endif
918
919 static void MachinePreCommonInitForLAPI(int everReturn) {
920 #if ENSURE_MSG_PAIRORDER
921     CpvInitialize(MsgOrderInfo, p2pMsgSeqInfo);
922     initMsgOrderInfo(&CpvAccess(p2pMsgSeqInfo));
923 #endif
924
925 #if MACHINE_DEBUG_LOG
926     {
927         char ln[200];
928         sprintf(ln,"debugLog.%d",CmiMyPe());
929         CpvInitialize(FILE *, debugLog);
930         CpvAccess(debugLog)=fopen(ln,"w");
931     }
932 #endif
933
934 }
935
936 static void MachinePostCommonInitForLAPI(int everReturn) {
937     /**
938          * In SMP, the machine layer usually has one comm thd, and it is
939          * designed to be responsible for all network communication. So
940          * if there's no dedicated processor for the comm thread, it has
941          * to share a proc with a worker thread. In this scenario,
942          * the worker thread needs to yield for some time to give CPU
943          * time to comm thread. However, in current configuration, we
944          * will always dedicate one proc for the comm thd, therefore,
945          * such yielding scheme is not necessary.  Besides, avoiding
946          * this yielding scheme improves performance because worker
947          * thread doesn't need to yield and will be more responsive to
948          * incoming messages. So, we will always use CmiNotifyIdle
949          * instead.
950          *
951          * --Chao Mei
952          */
953
954     /* Not registering any Idle-state related functions right now!! */
955
956 #if 0 && CMK_SMP
957     s=CmiNotifyGetState();
958     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
959     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
960 #else
961     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
962 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
963     /* If there's comm thread, then comm thd is responsible for advancing comm */
964     if (!CsvAccess(lapiInterruptMode)) {
965         CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn)AdvanceCommunicationForLAPI, NULL);
966     }
967 #endif
968 #endif
969 }
970 /* ######End of functions related with starting programs###### */
971
972 /***********************************************************************
973  *
974  * Abort function:
975  *
976  ************************************************************************/
977
978 void CmiAbort(const char *message) {
979     CmiError(message);
980     LAPI_Term(lapiContext);
981     exit(1);
982 }
983
984 /* What's the difference of this function with CmiAbort????
985    and whether LAPI_Term is needed?? It should be shared across
986    all machine layers.
987 static void PerrorExit(const char *msg) {
988     perror(msg);
989     LAPI_Term(lapiContext);
990     exit(1);
991 }
992 */
993
994 /* Barrier related functions */
995 /*TODO: does lapi provide any Barrrier related functions as DCMF provides??? --Chao Mei */
996 /* Barrier needs to be implemented!!! -Chao Mei */
997 /* These two barriers are only needed by CmiTimerInit to synchronize all the
998    threads. They do not need to provide a general barrier. */
999 int CmiBarrier() {
1000     return 0;
1001 }
1002 int CmiBarrierZero() {
1003     return 0;
1004 }
1005
1006
1007 /*@}*/
1008