First check-in for the work on extractin common codes from MPI, LAPI and DCMF layer...
[charm.git] / src / arch / lapi / machine.c
1 /*****************************************************************************
2 LAPI version of machine layer
3 Based on the template machine layer
4
5 Developed by
6 Filippo Gioachin   03/23/05
7 Chao Mei 01/28/2010, 05/07/2011
8 ************************************************************************/
9
10 /** @file
11  * LAPI based machine layer
12  * @ingroup Machine
13  */
14 /*@{*/
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <errno.h>
19 #include <assert.h>
20
21 #include <lapi.h>
22 #include "converse.h"
23
24 /*Support for ++debug: */
25 #include <unistd.h> /*For getpid()*/
26 #include <stdlib.h> /*For sleep()*/
27
28 /* Read the following carefully before building the machine layer on LAPI */
29 /* =========BEGIN OF EXPLANATION OF MACRO USAGE=============*/
30 /**
31  * 1. non-SMP mode:
32  *   CMK_SMP = 0;
33  *   CMK_PCQUEUE_LOCK = 1; (could be removed if memory fence and atomic ops are used)
34  *
35  *   (The following two could be disabled to reduce the overhead of machine layer)
36  *     ENSURE_MSG_PAIRORDER = 0|1;
37  *     ENABLE_CONVERSE_QD = 0|1;
38  *
39  *   (If ENSURE_MSG_PAIRORDER is 1, then setting CMK_OFFLOAD_BCAST_PROCESS to 1
40  *   will make the msg seqno increase at step of 1 w/o data race;
41  *     CMK_OFFLOAD_BCAST_PROCESS = 0|1;
42  * ===========================================================
43  * 2. SMP mode without comm thd:
44  *    CMK_SMP = 1;
45  *    CMK_PCQUEUE_LOCK = 1;
46  *    CMK_SMP_NO_COMMTHD = 1;
47  *
48  *    ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
49  *
50  *    CMK_OFFLOAD_BCAST_PROCESS has same options as in non-SMP mode;
51  * ===========================================================
52  *  3. SMP mode with comm thd:
53  *     CMK_SMP = 1;
54  *     CMK_PCQUEUE_LOCK = 1;
55  *     CMK_SMP_NO_COMMTHD = 0;
56  *
57  *     ENSURE_MSG_PAIRORDER and ENABLE_CONVERSE_QD have same options as in non-SMP mode;
58  *
59  *     (The following must be set with 1 as bcast msg is dealt with in comm thd!)
60  *     CMK_OFFLOAD_BCAST_PROCESS = 1;
61  *  ===========================================================
62  *
63  *  Assumptions we made in different mode:
64  *  1. non-SMP:
65  *     a) imm msgs are processed when the proc is idle, and periodically. They should
66  *        never be processed in the LAPI thread;
67  *     b) forwarding msgs could be done on the proc or in the internal LAPI
68  *        completion handler threads;
69  *  2. SMP w/o comm thd:
70  *     a) same with non-SMP a)
71  *     b) forwarding bcast msgs could be done on proc whose rank=0;
72  *        (enable CMK_OFFLOAD_BCAST_PROCESS)  or in internal LAPI completion
73  *        handler threads;
74  *     c) the destination rank of proc-level bcast msg is always 0;
75  *  3. SMP w/ comm thd:
76  *     a) imm msgs are processed in comm thd;
77  *     b) forwarding bcast msgs is done in comm thd;
78  *     c) same with 2 c)
79  *
80  */
81 /* =========END OF EXPLANATION OF MACRO USAGE=============*/
82
83 #include "machine.h"
84
85 #if CMK_SMP
86 #define CMK_PCQUEUE_LOCK 1
87 #else
88 /**
89  *  In non-smp case: the LAPI completion handler thread will
90  *  also access the proc's recv queue (a PCQueue), so the queue
91  *  needs to be protected. The number of producers equals the
92  *  #completion handler threads, while there's only one consumer
93  *  for the queue. Right now, the #completion handler threads is
94  *  set to 1, so the atomic operation for PCQueue should be
95  *  achieved via memory fence. --Chao Mei
96  */
97
98 /* Redefine CmiNodeLocks only for PCQueue data structure */
99 #define CmiNodeLock CmiNodeLock_nonsmp
100 #undef CmiCreateLock
101 #undef CmiLock
102 #undef CmiUnlock
103 #undef CmiTryLock
104 #undef CmiDestroyLock
105 typedef pthread_mutex_t *CmiNodeLock_nonsmp;
106 CmiNodeLock CmiCreateLock() {
107     CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
108     pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
109     return lk;
110 }
111 #define CmiLock(lock) (pthread_mutex_lock(lock))
112 #define CmiUnlock(lock) (pthread_mutex_unlock(lock))
113 #define CmiTryLock(lock) (pthread_mutex_trylock(lock))
114 void CmiDestroyLock(CmiNodeLock lock) {
115     pthread_mutex_destroy(lock);
116     free(lock);
117 }
118 #define CMK_PCQUEUE_LOCK 1
119 #endif
120 #include "pcqueue.h"
121
122 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
123 /**
124  * #####REGARDING IN-ORDER MESSAGE DELIVERY BETWEEN A PAIR OF
125  * PROCESSORS#####:
126  *
127  * Since the lapi doesn't guarantee the order of msg delivery
128  * (via network) between a pair of processors, we need to ensure
129  * this order via msg seq no and a window sliding based msg
130  * receiving scheme. So two extra fields are added to the basic
131  * msg header: srcPe and seqno. For node messages, we process it
132  * like a msg to be delivered to the first proc (rank=0) of that
133  * node.
134  *
135  * BTW: The in-order delivery between two processors in the same
136  * node is guaranteed in the SMP mode as the msg transfer
137  * doesn't go through LAPI.
138  *
139  * The msg transferred through LAPI (via network) is always
140  * delivered to the first proc (whose rank is 0) on that node!
141  *
142  * --Chao Mei
143  */
144 #define ENSURE_MSG_PAIRORDER 0
145
146 #if ENSURE_MSG_PAIRORDER
147
148 /* NOTE: this feature requires O(P) space on every proc to be functional */
149
150 #define MAX_MSG_SEQNO 65535
151 /* MAX_WINDOW_SIZE should be smaller than MAX_MSG_SEQNO, and MAX(unsigned char) */
152 #define MAX_WINDOW_SIZE 128
153 #define INIT_WINDOW_SIZE 8
154
155 /* The lock to ensure the completion handler (PumpMsgsComplete) is thread-safe */
156 CmiNodeLock cmplHdlrThdLock = NULL;
157
158 /**
159  *  expectedMsgSeqNo is an int array of size "#procs". It tracks
160  *  the expected seqno recved from other procs to this proc.
161  *
162  *  nextMsgSeqNo is an int array of size "#procs". It tracks
163  *  the next seqno of the msg to be sent from this proc to other
164  *  procs.
165  *
166  *  oooMsgBuffer is an array of sizeof(void **)*(#procs), each
167  * element (created on demand) points to a window (array) size
168  * of CUR_WINDOW_SIZE which buffers the out-of-order incoming
169  * messages (a (void *) array)
170  *
171  * oooMaxOffset indicates the maximum offset of the ooo msg
172  * ahead of the expected msg. The offset begins with 1, i.e.,
173  * (offset-1) is the index of the ooo msg in the window
174  * (oooMsgBuffer)
175  *
176  *  --Chao Mei
177  */
178
179 typedef struct MsgOrderInfoStruct {
180     /* vars used on sender side */
181     int *nextMsgSeqNo;
182
183     /* vars used on recv side */
184     int *expectedMsgSeqNo;
185     void ***oooMsgBuffer;
186     unsigned char *oooMaxOffset;
187     unsigned char *CUR_WINDOW_SIZE;
188 } MsgOrderInfo;
189
190 /**
191  * Once p2p msgs are ensured in-order delivery for a pair
192  * of procs, then the bcast msg is guaranteed correspondently
193  * as the order of msgs sent is fixed (i.e. the spanning tree
194  * or the hypercube is fixed)
195  */
196 CpvDeclare(MsgOrderInfo, p2pMsgSeqInfo);
197 #endif
198
199 /**
200  *  Enable this macro will offload the broadcast relay from the
201  *  internal completion handler thread. This will make the msg
202  *  seqno free of data-race. In SMP mode with comm thread where
203  *  comm thread will forward bcast msgs, this macro should be
204  *  enabled.
205  */
206 /* TODO: need re-consideration */
207 #define CMK_OFFLOAD_BCAST_PROCESS 1
208 /* When ENSURE_MSG_PAIRORDER is enabled, CMK_OFFLOAD_BCAST_PROCESS
209  * requires to be defined because if the bcast message is processed
210  * in the lapi completion handler (the internal lapi thread), then
211  * there's possibility of data races in setting the sequence number.
212  * In SMP mode, the bcast forwarding should be offloaded from the
213  * completion handler to the comm thread to reduce the overhead if
214  * there's comm thread. If there's no commthread, and if cpv is
215  * tls-based, then CMK_OFFLOAD_BCAST_PROCESS also requires enabled
216  * because there's charm proc-private variable access would be
217  * incorrect in lapi's internal threads. -Chao Mei
218  */
219 #if (CMK_SMP && (!CMK_SMP_NO_COMMTHD || (CMK_TLS_THREAD && !CMK_NOT_USE_TLS_THREAD))) || ENSURE_MSG_PAIRORDER
220 #undef CMK_OFFLOAD_BCAST_PROCESS
221 #define CMK_OFFLOAD_BCAST_PROCESS 1
222 #endif
223
224 #if CMK_OFFLOAD_BCAST_PROCESS
225 CsvDeclare(PCQueue, procBcastQ);
226 #if CMK_NODE_QUEUE_AVAILABLE
227 CsvDeclare(PCQueue, nodeBcastQ);
228 #endif
229 #endif
230
231 /* =======End of Definitions of Performance-Specific Macros =======*/
232
233
234 /* =======Beginning of Definitions of Msg Header Specific Macros =======*/
235 /* msg size and srcpe are required info as they will be used for forwarding
236  * bcast msg and for ensuring message ordering
237  */
238 #define CMI_MSG_SRCPE(msg)               ((CmiMsgHeaderBasic *)msg)->srcpe
239 #define CMI_MSG_SEQNO(msg)               ((CmiMsgHeaderBasic *)msg)->seqno
240 /* =======End of Definitions of Msg Header Specific Macros =======*/
241
242 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
243 /* =====End of Definitions of Message-Corruption Related Macros=====*/
244
245
246 /* =====Beginning of Declarations of Machine Specific Variables===== */
247
248 static int lapiDebugMode=0;
249 CsvDeclare(int, lapiInterruptMode);
250
251 /**
252  * The lapiContext stands for the lapi context for a single lapi
253  * task. And inside one lapi task, only one lapi context could
254  * be created via lapi_init. In SMP mode, this context is
255  * created by proc of rank 0, and then it is shared among all
256  * cores on a node (threads) --Chao Mei
257  *
258  */
259 static lapi_handle_t lapiContext;
260 static lapi_long_t lapiHeaderHandler = 1;
261 /* =====End of Declarations of Machine Specific Variables===== */
262
263
264 /* =====Beginning of Declarations of Machine Specific Functions===== */
265
266 /* The way to read this macro
267  * "routine args" becomes a function call inside the parameters of check_lapi_err,
268  * and it returns a int as returnCode;
269  * "#routine" turns the "routine" as a string
270  * __LINE__ is the line number in the source file
271  * -Chao Mei
272  */
273 #define check_lapi(routine,args) \
274         check_lapi_err(routine args, #routine, __LINE__);
275
276 static void check_lapi_err(int returnCode,const char *routine,int line) {
277     if (returnCode!=LAPI_SUCCESS) {
278         char errMsg[LAPI_MAX_ERR_STRING];
279         LAPI_Msg_string(returnCode,errMsg);
280         fprintf(stderr,"Fatal LAPI error while executing %s at %s:%d\n"
281                 "  Description: %s\n", routine, __FILE__, line, errMsg);
282         CmiAbort("Fatal LAPI error");
283     }
284 }
285
286 static void lapi_err_hndlr(lapi_handle_t *hndl, int *error_code,
287                            lapi_err_t *err_type, int *task_ID, int *src) {
288     char errstr[LAPI_MAX_ERR_STRING];
289     LAPI_Msg_string(*error_code, errstr);
290     fprintf(stderr, "ERROR IN LAPI: %s for task %d at src %d\n", errstr, *task_ID, *src);
291     LAPI_Term(*hndl);
292     exit(1);
293 }
294
295 /* ===== Beginging of functions regarding ensure in-order msg delivery ===== */
296 #if ENSURE_MSG_PAIRORDER
297
298 /**
299  * "setNextMsgSeqNo" actually sets the current seqno, the
300  * "getNextMsgSeqNo" will increment the seqno, i.e.,
301  * "getNextMsgSeqNo" returns the next seqno based on the previous
302  * seqno stored in the seqno array.
303  * --Chao Mei
304  */
305 static int getNextMsgSeqNo(int *seqNoArr, int destPe) {
306     int ret = seqNoArr[destPe];
307     ret++;
308     return ret;
309 }
310 static void setNextMsgSeqNo(int *seqNoArr, int destPe, int val) {
311     /* the seq no. may fast-forward to a new round (i.e., starting from 1 again!) */
312     if (val>=MAX_MSG_SEQNO) val -= MAX_MSG_SEQNO;
313     seqNoArr[destPe] = val;
314 }
315
316 #define getNextExpectedMsgSeqNo(seqNoArr,pe) getNextMsgSeqNo(seqNoArr, pe)
317 #define setNextExpectedMsgSeqNo(seqNoArr,pe,val) setNextMsgSeqNo(seqNoArr, pe, val)
318
319 static int checkMsgInOrder(char *msg, MsgOrderInfo *info);
320 #endif
321 /* ===== End of functions regarding ensure in-order msg delivery ===== */
322
323
324 /* The machine-specific send function */
325 static CmiCommHandle MachineSendFuncForLAPI(int destNode, int size, char *msg, int mode);
326 #define CmiMachineSpecificSendFunc MachineSendFuncForLAPI
327
328 /* ### Beginning of Machine-startup Related Functions ### */
329 static void MachineInitForLAPI(int argc, char **argv, int *numNodes, int *myNodeID);
330 #define MachineSpecificInit MachineInitForLAPI
331
332 static void MachinePreCommonInitForLAPI(int everReturn);
333 static void MachinePostCommonInitForLAPI(int everReturn);
334 #define MachineSpecificPreCommonInit MachinePreCommonInitForLAPI
335 #define MachineSpecificPostCommonInit MachinePostCommonInitForLAPI
336 /* ### End of Machine-startup Related Functions ### */
337
338 /* ### Beginning of Machine-running Related Functions ### */
339 static void AdvanceCommunicationForLAPI();
340 #define MachineSpecificAdvanceCommunication AdvanceCommunicationForLAPI
341
342 static void DrainResourcesForLAPI(); /* used when exit */
343 #define MachineSpecificDrainResources DrainResourcesForLAPI
344
345 static void MachineExitForLAPI();
346 #define MachineSpecificExit MachineExitForLAPI
347 /* ### End of Machine-running Related Functions ### */
348
349 /* ### Beginning of Idle-state Related Functions ### */
350 /* ### End of Idle-state Related Functions ### */
351
352 void MachinePostNonLocalForLAPI();
353 #define MachineSpecificPostNonLocal MachinePostNonLocalForLAPI
354
355 /* =====End of Declarations of Machine Specific Functions===== */
356
357 /**
358  *  Macros that overwrites the common codes, such as
359  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
360  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
361  *  CMK_OFFLOAD_BCAST_PROCESS etc.
362  */
363 /* For async msg sending ops, using lapi specific implementations */
364 #define USE_COMMON_ASYNC_BCAST 0
365 #define CMK_OFFLOAD_BCAST_PROCESS 1
366 #include "machine-common.c"
367
368 /* The machine specific msg-sending function */
369
370 /* ######Beginning of functions for sending a msg ###### */
371 //lapi sending completion callback
372 /* The following two are callbacks for sync and async send respectively */
373 static void ReleaseMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
374     MACHSTATE2(2,"[%d] ReleaseMsg begin %p {",CmiMyNode(),msg);
375     check_lapi_err(info->reason, "ReleaseMsg", __LINE__);
376     CmiFree(msg);
377     MACHSTATE(2,"} ReleaseMsg end");
378 }
379
380 static void DeliveredMsg(lapi_handle_t *myLapiContext, void *msg, lapi_sh_info_t *info) {
381     MACHSTATE1(2,"[%d] DeliveredMsg begin {",CmiMyNode());
382     check_lapi_err(info->reason, "DeliveredMsg", __LINE__);
383     *((int *)msg) = *((int *)msg) - 1;
384     MACHSTATE(2,"} DeliveredMsg end");
385 }
386
387 static INLINE_KEYWORD void lapiSendFn(int destNode, int size, char *msg, scompl_hndlr_t *shdlr, void *sinfo) {
388     lapi_xfer_t xfer_cmd;
389
390     MACHSTATE3(2,"lapiSendFn to destNode=%d with msg %p (isImm=%d) begin {",destNode,msg, CmiIsImmediate(msg));
391     MACHSTATE3(2, "inside lapiSendFn 1: size=%d, sinfo=%p, deliverable=%d", size, sinfo, deliverable);
392
393     MACHSTATE2(2, "Ready to call LAPI_Xfer with destNode=%d, destRank=%d",destNode,CMI_DEST_RANK(msg));
394
395     xfer_cmd.Am.Xfer_type = LAPI_AM_XFER;
396     xfer_cmd.Am.flags     = 0;
397     xfer_cmd.Am.tgt       = destNode;
398     xfer_cmd.Am.hdr_hdl   = lapiHeaderHandler;
399     xfer_cmd.Am.uhdr_len  = 0;
400     xfer_cmd.Am.uhdr      = NULL;
401     xfer_cmd.Am.udata     = msg;
402     xfer_cmd.Am.udata_len = size;
403     xfer_cmd.Am.shdlr     = shdlr;
404     xfer_cmd.Am.sinfo     = sinfo;
405     xfer_cmd.Am.tgt_cntr  = NULL;
406     xfer_cmd.Am.org_cntr  = NULL;
407     xfer_cmd.Am.cmpl_cntr = NULL;
408
409     check_lapi(LAPI_Xfer,(lapiContext, &xfer_cmd));
410
411     MACHSTATE(2,"} lapiSendFn end");
412 }
413
414 static CmiCommHandle MachineSendFuncForLAPI(int destNode, int size, char *msg, int mode) {
415     scompl_hndlr_t *shdlr = NULL;
416     void *sinfo = NULL;
417
418     if (mode==P2P_SYNC) {
419         shdlr = ReleaseMsg;
420         sinfo = (void *)msg;
421     } else if (mode==P2P_ASYNC) {
422         shdlr = DeliveredMsg;
423         sinfo = malloc(sizeof(int));
424         *((int *)sinfo) = 1;
425     }
426
427     CMI_MSG_SIZE(msg) = size;
428
429 #if ENSURE_MSG_PAIRORDER
430 #if CMK_NODE_QUEUE_AVAILABLE
431     if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE) {
432         lapiSendFn(destNode, size, msg, shdlr, sinfo);
433         return sinfo;
434     }
435 #endif
436     int destPE = CmiNodeFirst(destNode)+CMI_DEST_RANK(msg);
437     CMI_MSG_SRCPE(msg) = CmiMyPe();
438     /* Note: This could be executed on comm threads, where CmiMyPe() >= CmiNumPes() */
439     CMI_MSG_SEQNO(msg) = getNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE);
440     setNextMsgSeqNo(CpvAccess(p2pMsgSeqInfo).nextMsgSeqNo, destPE, CMI_MSG_SEQNO(msg));
441 #endif
442
443     lapiSendFn(destNode, size, msg, shdlr, sinfo);
444     return sinfo;
445 }
446
447 /* Lapi-specific implementation of async msg sending operations */
448 #if !USE_COMMON_ASYNC_BCAST
449 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
450 #if ENSURE_MSG_PAIRORDER
451     /* Not sure how to add the msg seq no for async broadcast messages --Chao Mei */
452     /* so abort here ! */
453     CmiAssert(0);
454     return 0;
455 #else
456     int i, rank;
457     int mype = CmiMyPe();
458 #if ENABLE_CONVERSE_QD
459     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
460 #endif
461     MACHSTATE1(3,"[%d] Sending async broadcast message from {",CmiMyNode());
462     CMI_BROADCAST_ROOT(msg) = 0;
463     void *handle = malloc(sizeof(int));
464     *((int *)handle) = CmiNumPes()-1;
465
466     for (i=mype+1; i<CmiNumPes(); i++) {
467         CMI_DEST_RANK(msg) = CmiRankOf(i);
468         lapiSendFn(CmiNodeOf(i), size, msg, DeliveredMsg, handle);
469     }
470     for (i=0; i<mype; i++) {
471         CMI_DEST_RANK(msg) = CmiRankOf(i);
472         lapiSendFn(CmiNodeOf(i), size, msg, DeliveredMsg, handle);
473     }
474
475     MACHSTATE(3,"} Sending async broadcast message end");
476     return handle;
477 #endif
478 }
479
480 #if CMK_NODE_QUEUE_AVAILABLE
481 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
482     int i;
483
484 #if ENABLE_CONVERSE_QD
485     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
486 #endif
487
488     MACHSTATE1(3,"[%d] Sending async node broadcast message from {",CmiMyNode());
489     CMI_BROADCAST_ROOT(msg) = 0;
490     CMI_DEST_RANK(msg) =DGRAM_NODEMESSAGE;
491     void *handle = malloc(sizeof(int));
492     *((int *)handle) = CmiNumNodes()-1;
493     for (i=CmiMyNode()+1; i<CmiNumNodes(); i++) {
494         lapiSendFn(i, size, msg, DeliveredMsg, handle);
495     }
496     for (i=0; i<CmiMyNode(); i++) {
497         lapiSendFn(i, size, msg, DeliveredMsg, handle);
498     }
499
500     MACHSTATE(3,"} Sending async broadcast message end");
501     return handle;
502 }
503 #endif
504 #endif/* end of !USE_COMMON_ASYNC_BCAST */
505
506 int CmiAsyncMsgSent(CmiCommHandle handle) {
507     return (*((int *)handle) == 0)?1:0;
508 }
509
510 void CmiReleaseCommHandle(CmiCommHandle handle) {
511 #ifndef CMK_OPTIMIZE
512     if (*((int *)handle) != 0) CmiAbort("Released a CmiCommHandle not free!");
513 #endif
514     free(handle);
515 }
516 /* ######End of functions for sending a msg ###### */
517
518 /* ######Beginning of functions for receiving a msg ###### */
519 /* lapi recv callback when the first packet of the msg arrives as header handler*/
520 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
521                            void *hdr, uint *uhdr_len,
522                            lapi_return_info_t *msg_info,
523                            compl_hndlr_t **comp_h, void **comp_am_info);
524 /* lapi recv completion callback when all the msg is received */
525 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info);
526
527 /** lapi header handler: executed on the recv side, when the
528  *  first packet of the recving msg arrives, it is called to
529  *  prepare the memory buffer in the user space for recving the
530  *  data --Chao Mei
531  */
532 static void* PumpMsgsBegin(lapi_handle_t *myLapiContext,
533                            void *hdr, uint *uhdr_len,
534                            lapi_return_info_t *msg_info,
535                            compl_hndlr_t **comp_h, void **comp_am_info) {
536     void *msg_buf;
537     MACHSTATE1(2,"[%d] PumpMsgsBegin begin {",CmiMyNode());
538     /* prepare the space for receiving the data, set the completion handler to
539        be executed inline */
540     msg_buf = (void *)CmiAlloc(msg_info->msg_len);
541
542     msg_info->ret_flags = LAPI_SEND_REPLY;
543     *comp_h = PumpMsgsComplete;
544     *comp_am_info = msg_buf;
545     MACHSTATE(2,"} PumpMsgsBegin end");
546     return msg_buf;
547
548 }
549
550 /**
551   * lapi completion handler on the recv side. It's responsible to push messages
552   * to the destination proc or relay broadcast messages. --Chao Mei
553   *
554   * Note: The completion handler could be executed on any cores within a node ???
555   * So in SMP mode when there's a comm thread, the completion handler should be carefully
556   * dealt with.
557   *
558   * Given lapi also provides an internal lapi thread to deal with network progress which
559   * will call this function (???), we should be careful with the following situations:
560   * 1) non SMP mode, with interrupt (lapi internal completion thread)
561   * 2) non SMP mode, with polling (machine layer is responsible for network progress)
562   * 3) SMP mode, no comm thread, with polling
563   * 4) SMP mode, no comm thread, with interrupt
564   * 5) SMP mode, with comm thread, with polling (not yet implemented, comm server is empty right now)
565   * 6) SMP mode, with comm thread, with interrupt??
566   *
567   * Currently, SMP mode without comm thread is undergoing implementation.
568   *
569   * This function is executed by LAPI internal threads. It seems that the number of internal
570   * completion handler threads could vary during the program. LAPI adaptively creates more
571   * threads if there are more outstanding messages!!!! This means pcqueue needs protection
572   * even in the nonsmp case!!!!
573   *
574   * --Chao Mei
575   */
576 static void PumpMsgsComplete(lapi_handle_t *myLapiContext, void *am_info) {
577     int i;
578     char *msg = am_info;
579     int broot, destrank;
580
581     MACHSTATE3(2,"[%d] PumpMsgsComplete with msg %p (isImm=%d) begin {",CmiMyNode(), msg, CmiIsImmediate(msg));
582 #if ENSURE_MSG_PAIRORDER
583     MACHSTATE3(2,"msg %p info: srcpe=%d, seqno=%d", msg, CMI_MSG_SRCPE(msg), CMI_MSG_SEQNO(msg));
584 #endif
585     /**
586      * First, we check if the msg is a broadcast msg via spanning
587      * tree. If it is, it needs to call SendSpanningChildren to
588      * relay the broadcast, and then send the msg to every cores on
589      * this node.
590      *
591      * After the first check, we deal with normal messages.
592      * --Chao Mei
593      */
594     /* It's the right place to relay the broadcast message */
595     /**
596      * 1. For in-order delivery, because this is the handler for
597      * receiving a message, and we assume the cross-network msgs are
598      * always delivered to the first proc (rank 0) of this node, we
599      * select the srcpe of the bcast msgs and the next msg seq no
600      * correspondingly.
601      *
602      * --Chao Mei
603      */
604 #if ENSURE_MSG_PAIRORDER
605     broot = CMI_BROADCAST_ROOT(msg);
606     destrank = CMI_DEST_RANK(msg);
607     /* Only check proc-level msgs */
608     if (broot>=0
609 #if CMK_NODE_QUEUE_AVAILABLE
610             && destrank != DGRAM_NODEMESSAGE
611 #endif
612        ) {
613         MsgOrderInfo *info;
614         info = &CpvAccessOther(p2pMsgSeqInfo, destrank);
615         MACHSTATE1(2, "Check msg in-order for p2p msg %p", msg);
616
617         if (checkMsgInOrder(msg,info)) {
618             MACHSTATE(2,"} PumpMsgsComplete end ");
619             return;
620         }
621     }
622 #endif
623
624     handleOneRecvedMsg(CMI_MSG_SIZE(msg), msg);
625
626     MACHSTATE(2,"} PumpMsgsComplete end ");
627     return;
628 }
629
630 /* utility function for ensuring the message pair-ordering */
631 #if ENSURE_MSG_PAIRORDER
632 /* return 1 if this msg is an out-of-order incoming message */
633 /**
634  * Returns 1 if this "msg" is an out-of-order message, or
635  * this "msg" is a late message which triggers the process
636  * of all buffered ooo msgs.
637  * --Chao Mei
638  */
639 static int checkMsgInOrder(char *msg, MsgOrderInfo *info) {
640     int srcpe, destrank;
641     int incomingSeqNo, expectedSeqNo;
642     int curOffset, maxOffset;
643     int i, curWinSize;
644     void **destMsgBuffer = NULL;
645
646     /* numMsg is the number of msgs to be processed in this buffer*/
647     /* Reason to have this extra copy of msgs to be processed: Reduce the atomic granularity */
648     void **toProcessMsgBuffer;
649     int numMsgs = 0;
650
651     srcpe = CMI_MSG_SRCPE(msg);
652     destrank = CMI_DEST_RANK(msg);
653     incomingSeqNo = CMI_MSG_SEQNO(msg);
654
655     CmiLock(cmplHdlrThdLock);
656
657     expectedSeqNo = getNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe);
658     if (expectedSeqNo == incomingSeqNo) {
659         /* Two cases: has ooo msg buffered or not */
660         maxOffset = (info->oooMaxOffset)[srcpe];
661         if (maxOffset>0) {
662             MACHSTATE1(4, "Processing all buffered ooo msgs (maxOffset=%d) including the just recved begin {", maxOffset);
663             curWinSize = info->CUR_WINDOW_SIZE[srcpe];
664             toProcessMsgBuffer = malloc((curWinSize+1)*sizeof(void *));
665             /* process the msg just recved */
666             toProcessMsgBuffer[numMsgs++] = msg;
667             /* process the buffered ooo msg until the first empty slot in the window */
668             destMsgBuffer = (info->oooMsgBuffer)[srcpe];
669             for (curOffset=0; curOffset<maxOffset; curOffset++) {
670                 char *curMsg = destMsgBuffer[curOffset];
671                 if (curMsg == NULL) {
672                     CmiAssert(curOffset!=(maxOffset-1));
673                     break;
674                 }
675                 toProcessMsgBuffer[numMsgs++] = curMsg;
676                 destMsgBuffer[curOffset] = NULL;
677             }
678             /* Update expected seqno, maxOffset and slide the window */
679             if (curOffset < maxOffset) {
680                 int i;
681                 /**
682                  * now, the seqno of the next to-be-recved msg should be
683                  * "expectedSeqNo+curOffset+1" as the seqno of the just
684                  * processed msg is "expectedSeqNo+curOffset. We need to slide
685                  * the msg buffer window from "curOffset+1" because the first
686                  * element of the buffer window should always points to the ooo
687                  * msg that's 1 in terms of seqno ahead of the next to-be-recved
688                  * msg. --Chao Mei
689                  */
690
691                 /* moving [curOffset+1, maxOffset) to [0, maxOffset-curOffset-1) in the window */
692                 /* The following two loops could be combined --Chao Mei */
693                 for (i=0; i<maxOffset-curOffset-1; i++) {
694                     destMsgBuffer[i] = destMsgBuffer[curOffset+i+1];
695                 }
696                 for (i=maxOffset-curOffset-1; i<maxOffset; i++) {
697                     destMsgBuffer[i] = NULL;
698                 }
699                 (info->oooMaxOffset)[srcpe] = maxOffset-curOffset-1;
700                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+curOffset);
701             } else {
702                 /* there's no remaining buffered ooo msgs */
703                 (info->oooMaxOffset)[srcpe] = 0;
704                 setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo+maxOffset);
705             }
706
707             CmiUnlock(cmplHdlrThdLock);
708
709             /* Process the msgs */
710             for (i=0; i<numMsgs; i++) {
711                 char *curMsg = toProcessMsgBuffer[i];
712                 if (CMI_BROADCAST_ROOT(curMsg)>0) {
713
714 #if CMK_OFFLOAD_BCAST_PROCESS
715                     PCQueuePush(CsvAccess(procBcastQ), curMsg);
716 #else
717                     processProcBcastMsg(CMI_MSG_SIZE(curMsg), curMsg);
718 #endif
719                 } else {
720                     CmiPushPE(CMI_DEST_RANK(curMsg), curMsg);
721                 }
722             }
723
724             free(toProcessMsgBuffer);
725
726             MACHSTATE1(4, "Processing all buffered ooo msgs (actually processed %d) end }", curOffset);
727             /**
728              * Since we have processed all buffered ooo msgs including
729              * this just recved one, 1 should be returned so that this
730              * msg no longer needs processing
731              */
732             return 1;
733         } else {
734             /* An expected msg recved without any ooo msg buffered */
735             MACHSTATE1(4, "Receiving an expected msg with seqno=%d\n", incomingSeqNo);
736             setNextExpectedMsgSeqNo(info->expectedMsgSeqNo, srcpe, expectedSeqNo);
737
738             CmiUnlock(cmplHdlrThdLock);
739             return 0;
740         }
741     }
742
743     MACHSTATE2(4, "Receiving an out-of-order msg with seqno=%d, but expect seqno=%d", incomingSeqNo, expectedSeqNo);
744     curWinSize = info->CUR_WINDOW_SIZE[srcpe];
745     if ((info->oooMsgBuffer)[srcpe]==NULL) {
746         (info->oooMsgBuffer)[srcpe] = malloc(curWinSize*sizeof(void *));
747         memset((info->oooMsgBuffer)[srcpe], 0, curWinSize*sizeof(void *));
748     }
749     destMsgBuffer = (info->oooMsgBuffer)[srcpe];
750     curOffset = incomingSeqNo - expectedSeqNo;
751     maxOffset = (info->oooMaxOffset)[srcpe];
752     if (curOffset<0) {
753         /* It's possible that the seqNo starts with another round (exceeding MAX_MSG_SEQNO) with 1 */
754         curOffset += MAX_MSG_SEQNO;
755     }
756     if (curOffset > curWinSize) {
757         int newWinSize;
758         if (curOffset > MAX_WINDOW_SIZE) {
759             CmiAbort("Exceeding the MAX_WINDOW_SIZE!\n");
760         }
761         newWinSize = ((curOffset/curWinSize)+1)*curWinSize;
762         /*CmiPrintf("[%d]: WARNING: INCREASING WINDOW SIZE FROM %d TO %d\n", CmiMyPe(), curWinSize, newWinSize);*/
763         (info->oooMsgBuffer)[srcpe] = malloc(newWinSize*sizeof(void *));
764         memset((info->oooMsgBuffer)[srcpe], 0, newWinSize*sizeof(void *));
765         memcpy((info->oooMsgBuffer)[srcpe], destMsgBuffer, curWinSize*sizeof(void *));
766         info->CUR_WINDOW_SIZE[srcpe] = newWinSize;
767         free(destMsgBuffer);
768         destMsgBuffer = (info->oooMsgBuffer)[srcpe];
769     }
770     CmiAssert(destMsgBuffer[curOffset-1] == NULL);
771     destMsgBuffer[curOffset-1] = msg;
772     if (curOffset > maxOffset) (info->oooMaxOffset)[srcpe] = curOffset;
773
774     CmiUnlock(cmplHdlrThdLock);
775     return 1;
776 }
777 #endif /* end of ENSURE_MSG_PAIRORDER */
778
779 /* ######End of functions for receiving a msg ###### */
780
781 /* ######Beginning of functions related with communication progress ###### */
782
783 static INLINE_KEYWORD void AdvanceCommunicationForLAPI() {
784     /* What about CMK_SMP_NO_COMMTHD in the original implementation?? */
785     /* It does nothing but sleep */
786     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
787 }
788 /* ######End of functions related with communication progress ###### */
789
790 void MachinePostNonLocalForLAPI() {
791     /* None here */
792 }
793
794 /**
795  * TODO: What will be the effects if calling LAPI_Probe in the
796  * interrupt mode??? --Chao Mei
797  */
798 /* Network progress function is used to poll the network when for
799    messages. This flushes receive buffers on some  implementations*/
800 #if CMK_MACHINE_PROGRESS_DEFINED
801 void CmiMachineProgressImpl() {
802     if (!CsvAccess(lapiInterruptMode)) check_lapi(LAPI_Probe,(lapiContext));
803
804 #if CMK_IMMEDIATE_MSG
805     MACHSTATE1(2, "[%d] Handling Immediate Message begin {",CmiMyNode());
806     CmiHandleImmediate();
807     MACHSTATE1(2, "[%d] Handling Immediate Message end }",CmiMyNode());
808 #endif
809
810 #if CMK_SMP && !CMK_SMP_NO_COMMTHD && CMK_OFFLOAD_BCAST_PROCESS
811     if (CmiMyRank()==CmiMyNodeSize()) processBcastQs(); /* FIXME ????????????????*/
812 #endif
813 }
814 #endif
815
816 /* ######Beginning of functions related with exiting programs###### */
817 void DrainResourcesForLAPI() {
818     /* None here */
819 }
820
821 void MachineExitForLAPI(void) {
822     check_lapi(LAPI_Gfence, (lapiContext));
823     check_lapi(LAPI_Term, (lapiContext));
824     exit(EXIT_SUCCESS);
825 }
826 /* ######End of functions related with exiting programs###### */
827
828
829 /* ######Beginning of functions related with starting programs###### */
830 /**
831  *  Obtain the number of nodes, my node id, and consuming machine layer
832  *  specific arguments
833  */
834 static void MachineInitForLAPI(int argc, char **argv, int *numNodes, int *myNodeID) {
835
836     lapi_info_t info;
837
838     memset(&info,0,sizeof(info));
839
840     /* Register error handler (redundant?) -- added by Chao Mei*/
841     info.err_hndlr = (LAPI_err_hndlr *)lapi_err_hndlr;
842
843     /* Indicates the number of completion handler threads to create */
844     /* The number of completion hndlr thds will affect the atomic PCQueue operations!! */
845     /* NOTE: num_compl_hndlr_thr is obsolete now! --Chao Mei */
846     /* info.num_compl_hndlr_thr = 1; */
847
848     check_lapi(LAPI_Init,(&lapiContext, &info));
849
850     /* It's a good idea to start with a fence,
851        because packets recv'd before a LAPI_Init are just dropped. */
852     check_lapi(LAPI_Gfence,(lapiContext));
853
854     check_lapi(LAPI_Qenv,(lapiContext, TASK_ID, myNodeID));
855     check_lapi(LAPI_Qenv,(lapiContext, NUM_TASKS, numNodes));
856
857     /* Make polling as the default mode as real apps have better perf */
858     CsvAccess(lapiInterruptMode) = 0;
859     if (CmiGetArgFlag(argv,"+poll")) CsvAccess(lapiInterruptMode) = 0;
860     if (CmiGetArgFlag(argv,"+nopoll")) CsvAccess(lapiInterruptMode) = 1;
861
862     check_lapi(LAPI_Senv,(lapiContext, ERROR_CHK, lapiDebugMode));
863     check_lapi(LAPI_Senv,(lapiContext, INTERRUPT_SET, CsvAccess(lapiInterruptMode)));
864
865     if (*myNodeID == 0) {
866         printf("Running lapi in interrupt mode: %d\n", CsvAccess(lapiInterruptMode));
867         printf("Running lapi with %d completion handler threads.\n", info.num_compl_hndlr_thr);
868     }
869
870     /**
871      *  Associate PumpMsgsBegin with var "lapiHeaderHandler". Then inside Xfer calls,
872      *  lapiHeaderHandler could be used to indicate the callback
873      *  instead of PumpMsgsBegin --Chao Mei
874      */
875     check_lapi(LAPI_Addr_set,(lapiContext,(void *)PumpMsgsBegin,lapiHeaderHandler));
876
877     if (CmiGetArgFlag(argv,"++debug")) {  /*Pause so user has a chance to start and attach debugger*/
878         printf("CHARMDEBUG> Processor %d has PID %d\n",*myNodeID,getpid());
879         if (!CmiGetArgFlag(argv,"++debug-no-pause"))
880             sleep(30);
881     }
882
883 #if ENSURE_MSG_PAIRORDER
884     cmplHdlrThdLock = CmiCreateLock();
885 #endif
886 }
887
888 #if MACHINE_DEBUG_LOG
889 CpvDeclare(FILE *, debugLog);
890 #endif
891
892 #if ENSURE_MSG_PAIRORDER
893 static void initMsgOrderInfo(MsgOrderInfo *info) {
894     int i;
895     int totalPEs = CmiNumPes();
896 #if CMK_SMP && CMK_OFFLOAD_BCAST_PROCESS
897     /* the comm thread will also access such info */
898     totalPEs += CmiNumNodes();
899 #endif
900     info->nextMsgSeqNo = malloc(totalPEs*sizeof(int));
901     memset(info->nextMsgSeqNo, 0, totalPEs*sizeof(int));
902
903     info->expectedMsgSeqNo = malloc(totalPEs*sizeof(int));
904     memset(info->expectedMsgSeqNo, 0, totalPEs*sizeof(int));
905
906     info->oooMsgBuffer = malloc(totalPEs*sizeof(void **));
907     memset(info->oooMsgBuffer, 0, totalPEs*sizeof(void **));
908
909     info->oooMaxOffset = malloc(totalPEs*sizeof(unsigned char));
910     memset(info->oooMaxOffset, 0, totalPEs*sizeof(unsigned char));
911
912     info->CUR_WINDOW_SIZE = malloc(totalPEs*sizeof(unsigned char));
913     for (i=0; i<totalPEs; i++) info->CUR_WINDOW_SIZE[i] = INIT_WINDOW_SIZE;
914 }
915 #endif
916
917 static void MachinePreCommonInitForLAPI(int everReturn) {
918 #if ENSURE_MSG_PAIRORDER
919     CpvInitialize(MsgOrderInfo, p2pMsgSeqInfo);
920     initMsgOrderInfo(&CpvAccess(p2pMsgSeqInfo));
921 #endif
922
923 #if MACHINE_DEBUG_LOG
924     {
925         char ln[200];
926         sprintf(ln,"debugLog.%d",CmiMyPe());
927         CpvInitialize(FILE *, debugLog);
928         CpvAccess(debugLog)=fopen(ln,"w");
929     }
930 #endif
931
932 }
933
934 static void MachinePostCommonInitForLAPI(int everReturn) {
935     /**
936          * In SMP, the machine layer usually has one comm thd, and it is
937          * designed to be responsible for all network communication. So
938          * if there's no dedicated processor for the comm thread, it has
939          * to share a proc with a worker thread. In this scenario,
940          * the worker thread needs to yield for some time to give CPU
941          * time to comm thread. However, in current configuration, we
942          * will always dedicate one proc for the comm thd, therefore,
943          * such yielding scheme is not necessary.  Besides, avoiding
944          * this yielding scheme improves performance because worker
945          * thread doesn't need to yield and will be more responsive to
946          * incoming messages. So, we will always use CmiNotifyIdle
947          * instead.
948          *
949          * --Chao Mei
950          */
951
952     /* Not registering any Idle-state related functions right now!! */
953
954 #if 0 && CMK_SMP
955     s=CmiNotifyGetState();
956     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
957     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
958 #else
959     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
960 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
961     /* If there's comm thread, then comm thd is responsible for advancing comm */
962     if (!CsvAccess(lapiInterruptMode)) {
963         CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn)AdvanceCommunicationForLAPI, NULL);
964     }
965 #endif
966 #endif
967 }
968 /* ######End of functions related with starting programs###### */
969
970 /***********************************************************************
971  *
972  * Abort function:
973  *
974  ************************************************************************/
975
976 void CmiAbort(const char *message) {
977     CmiError(message);
978     LAPI_Term(lapiContext);
979     exit(1);
980 }
981
982 /* What's the difference of this function with CmiAbort????
983    and whether LAPI_Term is needed?? It should be shared across
984    all machine layers.
985 static void PerrorExit(const char *msg) {
986     perror(msg);
987     LAPI_Term(lapiContext);
988     exit(1);
989 }
990 */
991
992 /* Barrier related functions */
993 /*TODO: does lapi provide any Barrrier related functions as DCMF provides??? --Chao Mei */
994 /* Barrier needs to be implemented!!! -Chao Mei */
995 /* These two barriers are only needed by CmiTimerInit to synchronize all the
996    threads. They do not need to provide a general barrier. */
997 int CmiBarrier() {
998     return 0;
999 }
1000 int CmiBarrierZero() {
1001     return 0;
1002 }
1003
1004
1005 /*@}*/
1006