mpi-bluegeneq-smp changes.
[charm.git] / src / arch / portals-crayxt3 / machine.c
1 /** @file
2  * Portals based machine layer
3  * @ingroup Machine
4  */
5 /*@{*/
6
7 #include <stdio.h>
8 #include <errno.h>
9 #include <portals/portals3.h>
10 #include <catamount/cnos_mpi_os.h>
11
12 #include "converse.h"
13
14 /*Support for ++debug: */
15 #include <unistd.h> /*For getpid()*/
16 #include <stdlib.h> /*For sleep()*/
17
18 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
19 #define CMK_SMP 1
20 #endif
21
22 #include "machine.h"
23
24 #include "pcqueue.h"
25
26 extern ptl_handle_ni_t _ni_handle;
27 int mynid, mypid;
28
29 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
30
31 #if CMK_VERSION_BLUEGENE
32 #define MAX_QLEN 8
33 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
34 #else
35 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
36 #define MAX_QLEN 200
37 #endif
38
39
40 /*
41     To reduce the buffer used in broadcast and distribute the load from 
42   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of 
43   spanning tree broadcast algorithm.
44     This will use the fourth short in message as an indicator of spanning tree
45   root.
46 */
47 #if CMK_SMP
48 #define CMK_BROADCAST_SPANNING_TREE    0
49 #else
50 #define CMK_BROADCAST_SPANNING_TREE    1
51 #define CMK_BROADCAST_HYPERCUBE        0
52 #endif
53
54 #define BROADCAST_SPANNING_FACTOR      4
55
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
58
59 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
60 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
61
62 /* FIXME: need a random number that everyone agrees ! */
63 #define CHARM_MAGIC_NUMBER               126
64
65 #if !CMK_OPTIMIZE
66 static int checksum_flag = 0;
67 #define CMI_SET_CHECKSUM(msg, len)      \
68         if (checksum_flag)  {   \
69           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
70           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
71         }
72 #define CMI_CHECK_CHECKSUM(msg, len)    \
73         if (checksum_flag)      \
74           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
75             CmiAbort("Fatal error: checksum doesn't agree!\n");
76 #else
77 #define CMI_SET_CHECKSUM(msg, len)
78 #define CMI_CHECK_CHECKSUM(msg, len)
79 #endif
80
81 #if CMK_BROADCAST_SPANNING_TREE
82 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
83 #else
84 #  define CMI_SET_BROADCAST_ROOT(msg, root)
85 #endif
86
87 #if CMK_BROADCAST_HYPERCUBE
88 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
89 #else
90 #  define CMI_SET_CYCLE(msg, cycle)
91 #endif
92
93 /*
94  to avoid MPI's in order delivery, changing MPI Tag all the time
95 */
96 #define TAG     1375
97 /*
98 static int mpi_tag = TAG;
99 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
100 */
101
102 int               _Cmi_numpes;
103 int               _Cmi_mynode;    /* Which address space am I */
104 int               _Cmi_mynodesize;/* Number of processors in my address space */
105 int               _Cmi_numnodes;  /* Total number of address spaces */
106 int               _Cmi_numpes;    /* Total number of processors */
107 static int        Cmi_nodestart; /* First processor in this address space */ 
108 CpvDeclare(void*, CmiLocalQueue);
109
110 /*Network progress utility variables. Period controls the rate at
111   which the network poll is called */
112 CpvDeclare(unsigned , networkProgressCount);
113 int networkProgressPeriod;
114
115 int               idleblock = 0;
116
117 #define BLK_LEN  512
118
119 #if CMK_NODE_QUEUE_AVAILABLE
120 #define DGRAM_NODEMESSAGE   (0xFB)
121
122 #define NODE_BROADCAST_OTHERS (-1)
123 #define NODE_BROADCAST_ALL    (-2)
124 #endif
125
126 static void ConverseRunPE(int everReturn);
127 static void CommunicationServer(int sleepTime);
128 static void CommunicationServerThread(int sleepTime);
129
130 typedef struct msg_list {
131      char *msg;
132      struct msg_list *next;
133      int size, destpe;
134 #if 0
135      MPI_Request req;
136 #endif
137 } SMSG_LIST;
138
139 int MsgQueueLen=0;
140 static int request_max;
141
142 static SMSG_LIST *sent_msgs=0;
143 static SMSG_LIST *end_sent=0;
144
145 static int Cmi_dim;
146
147 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
148
149 #if NODE_0_IS_CONVHOST
150 int inside_comm = 0;
151 #endif
152
153 void CmiAbort(const char *message);
154 static void PerrorExit(const char *msg);
155
156 void SendSpanningChildren(int size, char *msg);
157 void SendHypercube(int size, char *msg);
158
159 static void PerrorExit(const char *msg)
160 {
161   perror(msg);
162   exit(1);
163 }
164
165 extern unsigned char computeCheckSum(unsigned char *data, int len);
166
167 void CmiBarrier()
168 {
169 #if 0
170   if (CmiMyRank() == 0)
171     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
172         CmiAbort("Timernit: MPI_Barrier failed!\n");
173 #endif
174 }
175
176 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
177 void CmiBarrierZero()
178 {
179 #if 0
180   int i;
181   if (CmiMyRank() == 0) {
182     char msg[1];
183     MPI_Status sts;
184     if (CmiMyNode() == 0)  {
185       for (i=0; i<CmiNumNodes()-1; i++) {
186         if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
187           printf("MPI_Recv failed!\n");
188       }
189     }
190     else {
191       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,TAG,MPI_COMM_WORLD))
192          printf("MPI_Send failed!\n");
193     }
194   }
195   CmiNodeAllBarrier();
196 #endif
197 }
198
199 typedef struct ProcState {
200 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
201 CmiNodeLock  recvLock;              /* for cs->recv */
202 } ProcState;
203
204 static ProcState  *procState;
205
206 #if CMK_SMP
207
208 static PCQueue sendMsgBuf;
209 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
210
211 #endif
212
213 /************************************************************
214  * 
215  * Processor state structure
216  *
217  ************************************************************/
218
219 /* fake Cmi_charmrun_fd */
220 static int Cmi_charmrun_fd = 0;
221 #include "machine-smp.c"
222
223 CsvDeclare(CmiNodeState, NodeState);
224
225 #include "immediate.c"
226
227 #if ! CMK_SMP
228 /************ non SMP **************/
229 static struct CmiStateStruct Cmi_state;
230 int _Cmi_mype;
231 int _Cmi_myrank;
232
233 void CmiMemLock(void) {}
234 void CmiMemUnlock(void) {}
235
236 #define CmiGetState() (&Cmi_state)
237 #define CmiGetStateN(n) (&Cmi_state)
238
239 void CmiYield(void) { sleep(0); }
240
241 static void CmiStartThreads(char **argv)
242 {
243   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
244   _Cmi_mype = Cmi_nodestart;
245   _Cmi_myrank = 0;
246 }      
247 #endif  /* non smp */
248
249 /*Add a message to this processor's receive queue, pe is a rank */
250 void CmiPushPE(int pe,void *msg)
251 {
252   CmiState cs = CmiGetStateN(pe);
253   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
254 #if CMK_IMMEDIATE_MSG
255   if (CmiIsImmediate(msg)) {
256 /*
257 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
258     CmiHandleMessage(msg);
259 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
260 */
261     /**(CmiUInt2 *)msg = pe;*/
262     CMI_DEST_RANK(msg) = pe;
263     CmiPushImmediateMsg(msg);
264     return;
265   }
266 #endif
267
268 #if CMK_SMP
269   CmiLock(procState[pe].recvLock);
270 #endif
271   PCQueuePush(cs->recv,msg);
272 #if CMK_SMP
273   CmiUnlock(procState[pe].recvLock);
274 #endif
275   CmiIdleLock_addMessage(&cs->idle); 
276   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
277 }
278
279 #if CMK_NODE_QUEUE_AVAILABLE
280 /*Add a message to this processor's receive queue */
281 static void CmiPushNode(void *msg)
282 {
283   MACHSTATE(3,"Pushing message into NodeRecv queue");
284 #if CMK_IMMEDIATE_MSG
285   if (CmiIsImmediate(msg)) {
286     CMI_DEST_RANK(msg) = 0;
287     CmiPushImmediateMsg(msg);
288     return;
289   }
290 #endif
291   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
292   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
293   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
294   {
295   CmiState cs=CmiGetStateN(0);
296   CmiIdleLock_addMessage(&cs->idle);
297   }
298 }
299 #endif
300
301 #ifndef CmiMyPe
302 int CmiMyPe(void)
303 {
304   return CmiGetState()->pe;
305 }
306 #endif
307
308 #ifndef CmiMyRank
309 int CmiMyRank(void)
310 {
311   return CmiGetState()->rank;
312 }
313 #endif
314
315 #ifndef CmiNodeFirst
316 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
317 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
318 #endif
319
320 #ifndef CmiNodeOf
321 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
322 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
323 #endif
324
325 static int CmiAllAsyncMsgsSent(void)
326 {
327 #if 0
328    SMSG_LIST *msg_tmp = sent_msgs;
329    MPI_Status sts;
330    int done;
331      
332    while(msg_tmp!=0) {
333     done = 0;
334     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts)) 
335       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
336     if(!done)
337       return 0;
338     msg_tmp = msg_tmp->next;
339 /*    MsgQueueLen--; ????? */
340    }
341    return 1;
342 #endif
343 }
344
345 int CmiAsyncMsgSent(CmiCommHandle c) {
346 #if 0     
347   SMSG_LIST *msg_tmp = sent_msgs;
348   int done;
349   MPI_Status sts;
350
351   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
352     msg_tmp = msg_tmp->next;
353   if(msg_tmp) {
354     done = 0;
355     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts)) 
356       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
357     return ((done)?1:0);
358   } else {
359     return 1;
360   }
361 #endif
362 }
363
364 void CmiReleaseCommHandle(CmiCommHandle c)
365 {
366   return;
367 }
368
369 #if CMK_VERSION_BLUEGENE
370 extern void MPID_Progress_test();
371 #endif
372
373 void CmiReleaseSentMessages(void)
374 {
375 #if 0
376   SMSG_LIST *msg_tmp=sent_msgs;
377   SMSG_LIST *prev=0;
378   SMSG_LIST *temp;
379   int done;
380   MPI_Status sts;
381
382   
383 #if CMK_VERSION_BLUEGENE
384   MPID_Progress_test();
385 #endif
386  
387   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
388   while(msg_tmp!=0) {
389     done =0;
390     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
391       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
392     if(done) {
393       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
394       MsgQueueLen--;
395       /* Release the message */
396       temp = msg_tmp->next;
397       if(prev==0)  /* first message */
398         sent_msgs = temp;
399       else
400         prev->next = temp;
401       CmiFree(msg_tmp->msg);
402       CmiFree(msg_tmp);
403       msg_tmp = temp;
404     } else {
405       prev = msg_tmp;
406       msg_tmp = msg_tmp->next;
407     }
408   }
409   end_sent = prev;
410   MACHSTATE(2,"} CmiReleaseSentMessages end");
411 #endif
412 }
413
414 int PumpMsgs(void)
415 {
416 #if 0
417   int nbytes, flg, res;
418   char *msg;
419   MPI_Status sts;
420   int recd=0;
421
422 #if CMK_VERSION_BLUEGENE
423   MPID_Progress_test();
424 #endif
425
426   MACHSTATE(2,"PumpMsgs begin {");
427   while(1) {
428     flg = 0;
429     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
430     if(res != MPI_SUCCESS)
431       CmiAbort("MPI_Iprobe failed\n");
432     if(!flg) break;
433     recd = 1;
434     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
435     msg = (char *) CmiAlloc(nbytes);
436     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts)) 
437       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
438
439     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
440     CMI_CHECK_CHECKSUM(msg, nbytes);
441     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
442       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
443       CmiFree(msg);
444       CmiAbort("Abort!\n");
445       continue;
446     }
447 #if CMK_NODE_QUEUE_AVAILABLE
448     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
449       CmiPushNode(msg);
450     else
451 #endif
452       CmiPushPE(CMI_DEST_RANK(msg), msg);
453
454 #if CMK_BROADCAST_SPANNING_TREE
455     if (CMI_BROADCAST_ROOT(msg))
456       SendSpanningChildren(nbytes, msg);
457 #elif CMK_BROADCAST_HYPERCUBE
458     if (CMI_GET_CYCLE(msg))
459       SendHypercube(nbytes, msg);
460 #endif
461   }
462 #if CMK_IMMEDIATE_MSG && !CMK_SMP
463   CmiHandleImmediate();
464 #endif
465   MACHSTATE(2,"} PumpMsgs end ");
466   return recd;
467 #endif
468 }
469
470 /* blocking version */
471 static void PumpMsgsBlocking(void)
472 {
473 #if 0
474   static int maxbytes = 20000000;
475   static char *buf = NULL;
476   int nbytes, flg;
477   MPI_Status sts;
478   char *msg;
479   int recd=0;
480
481   if (!PCQueueEmpty(CmiGetState()->recv)) return; 
482   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
483   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
484   if (sent_msgs)  return;
485
486 #if 0
487   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
488 #endif
489
490   if (buf == NULL) {
491     buf = (char *) CmiAlloc(maxbytes);
492     _MEMCHECK(buf);
493   }
494
495   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts)) 
496       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
497    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
498    msg = (char *) CmiAlloc(nbytes);
499    memcpy(msg, buf, nbytes);
500
501 #if CMK_NODE_QUEUE_AVAILABLE
502    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
503       CmiPushNode(msg);
504    else
505 #endif
506       CmiPushPE(CMI_DEST_RANK(msg), msg);
507
508 #if CMK_BROADCAST_SPANNING_TREE
509    if (CMI_BROADCAST_ROOT(msg))
510       SendSpanningChildren(nbytes, msg);
511 #elif CMK_BROADCAST_HYPERCUBE
512    if (CMI_GET_CYCLE(msg))
513       SendHypercube(nbytes, msg);
514 #endif
515 #endif
516 }
517
518 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
519
520 #if CMK_SMP
521
522 static int inexit = 0;
523
524 static int MsgQueueEmpty()
525 {
526   int i;
527 #if 0
528   for (i=0; i<_Cmi_mynodesize; i++)
529     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
530 #else
531   return PCQueueEmpty(sendMsgBuf);
532 #endif
533   return 1;
534 }
535
536 static int SendMsgBuf();
537
538 /* test if all processors recv queues are empty */
539 static int RecvQueueEmpty()
540 {
541   int i;
542   for (i=0; i<_Cmi_mynodesize; i++) {
543     CmiState cs=CmiGetStateN(i);
544     if (!PCQueueEmpty(cs->recv)) return 0;
545   }
546   return 1;
547 }
548
549 /**
550 CommunicationServer calls MPI to send messages in the queues and probe message from network.
551 */
552 static void CommunicationServer(int sleepTime)
553 {
554   int static count=0;
555 /*
556   count ++;
557   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
558 */
559   PumpMsgs();
560   CmiReleaseSentMessages();
561   SendMsgBuf(); 
562 /*
563   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
564 */
565   if (inexit == CmiMyNodeSize()) {
566     MACHSTATE(2, "CommunicationServer exiting {");
567 #if 0
568     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
569 #endif
570     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
571       CmiReleaseSentMessages();
572       SendMsgBuf(); 
573       PumpMsgs();
574     }
575     MACHSTATE(2, "CommunicationServer barrier begin {");
576     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
577       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
578     MACHSTATE(2, "} CommunicationServer barrier end");
579 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
580     if (CmiMyNode() == 0){
581       CmiPrintf("End of program\n");
582     }
583 #endif
584     MACHSTATE(2, "} CommunicationServer EXIT");
585     MPI_Finalize();
586     exit(0);   
587   }
588 }
589
590 #endif
591
592 static void CommunicationServerThread(int sleepTime)
593 {
594 #if CMK_SMP
595   CommunicationServer(sleepTime);
596 #endif
597 #if CMK_IMMEDIATE_MSG
598   CmiHandleImmediate();
599 #endif
600 }
601
602 #if CMK_NODE_QUEUE_AVAILABLE
603 char *CmiGetNonLocalNodeQ(void)
604 {
605   CmiState cs = CmiGetState();
606   char *result = 0;
607   CmiIdleLock_checkMessage(&cs->idle);
608   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
609     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
610     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
611     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
612     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
613     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
614   }
615   return result;
616 }
617 #endif
618
619 void *CmiGetNonLocal(void)
620 {
621 #if 0
622   static int count=0;
623   CmiState cs = CmiGetState();
624   void *msg;
625   CmiIdleLock_checkMessage(&cs->idle);
626   /* although it seems that lock is not needed, I found it crashes very often
627      on mpi-smp without lock */
628
629   CmiReleaseSentMessages();
630   PumpMsgs();
631   
632   CmiLock(procState[cs->rank].recvLock);
633   msg =  PCQueuePop(cs->recv); 
634   CmiUnlock(procState[cs->rank].recvLock);
635
636 /*
637   if (msg) {
638     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
639   else {
640     count++;
641     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
642   }
643 */
644 #if ! CMK_SMP
645   if (no_outstanding_sends) {
646     while (MsgQueueLen>0) {
647       CmiReleaseSentMessages();
648       PumpMsgs();
649     }
650   }
651   
652   if(!msg) {
653     CmiReleaseSentMessages();
654     if (PumpMsgs())
655       return  PCQueuePop(cs->recv);
656     else
657       return 0;
658   }
659 #endif
660   return msg;
661 #endif
662 }
663
664 /* called in non-smp mode */
665 void CmiNotifyIdle(void)
666 {
667   CmiReleaseSentMessages();
668   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
669 }
670  
671
672 /********************************************************
673     The call to probe immediate messages has been renamed to
674     CmiMachineProgressImpl
675 ******************************************************/
676 /* user call to handle immediate message, only useful in non SMP version
677    using polling method to schedule message.
678 */
679 /*
680 #if CMK_IMMEDIATE_MSG
681 void CmiProbeImmediateMsg()
682 {
683 #if !CMK_SMP
684   PumpMsgs();
685   CmiHandleImmediate();
686 #endif
687 }
688 #endif
689 */
690
691 /* Network progress function is used to poll the network when for
692    messages. This flushes receive buffers on some  implementations*/
693 void CmiMachineProgressImpl()
694 {
695 #if !CMK_SMP
696     PumpMsgs();
697 #if CMK_IMMEDIATE_MSG
698     CmiHandleImmediate();
699 #endif
700 #else
701     /*Not implemented yet. Communication server does not seem to be
702       thread safe */
703     /* CommunicationServerThread(0); */
704 #endif
705 }
706
707 /********************* MESSAGE SEND FUNCTIONS ******************/
708
709 static void CmiSendSelf(char *msg)
710 {
711 #if CMK_IMMEDIATE_MSG
712     if (CmiIsImmediate(msg)) {
713       /* CmiBecomeNonImmediate(msg); */
714       CmiPushImmediateMsg(msg);
715       CmiHandleImmediate();
716       return;
717     }
718 #endif
719     CQdCreate(CpvAccess(cQdState), 1);
720     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
721 }
722
723 void CmiSyncSendFn(int destPE, int size, char *msg)
724 {
725   CmiState cs = CmiGetState();
726   char *dupmsg = (char *) CmiAlloc(size);
727   memcpy(dupmsg, msg, size);
728
729   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
730
731   if (cs->pe==destPE) {
732     CmiSendSelf(dupmsg);
733   }
734   else
735     CmiAsyncSendFn(destPE, size, dupmsg);
736 }
737
738 #if CMK_SMP
739
740 /* called by communication thread in SMP */
741 static int SendMsgBuf()
742 {
743   SMSG_LIST *msg_tmp;
744   char *msg;
745   int node, rank, size;
746   int i;
747   int sent = 0;
748
749   MACHSTATE(2,"SendMsgBuf begin {");
750 #if 0
751   for (i=0; i<_Cmi_mynodesize; i++)
752   {
753     while (!PCQueueEmpty(procState[i].sendMsgBuf))
754     {
755       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
756 #else
757     /* single message sending queue */
758     CmiLock(sendMsgBufLock);
759     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
760     CmiUnlock(sendMsgBufLock);
761     while (NULL != msg_tmp)
762     {
763 #endif
764       node = msg_tmp->destpe;
765       size = msg_tmp->size;
766       msg = msg_tmp->msg;
767       msg_tmp->next = 0;
768       while (MsgQueueLen > request_max) {
769         CmiReleaseSentMessages();
770         PumpMsgs();
771       }
772       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
773       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
774       CMI_SET_CHECKSUM(msg, size);
775       if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req))) 
776         CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
777       MACHSTATE(3,"}MPI_send end");
778       MsgQueueLen++;
779       if(sent_msgs==0)
780         sent_msgs = msg_tmp;
781       else
782         end_sent->next = msg_tmp;
783       end_sent = msg_tmp;
784       sent=1;
785       CmiLock(sendMsgBufLock);
786       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
787       CmiUnlock(sendMsgBufLock);
788     }
789 #if 0
790   } 
791 #endif
792   MACHSTATE(2,"}SendMsgBuf end ");
793   return sent;
794 }
795
796 void EnqueueMsg(void *m, int size, int node)    
797 {
798   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
799   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
800   msg_tmp->msg = m;
801   msg_tmp->size = size; 
802   msg_tmp->destpe = node;       
803   CmiLock(sendMsgBufLock);
804   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
805   CmiUnlock(sendMsgBufLock);
806   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
807 }
808
809 #endif
810
811 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
812 {
813 #if 0
814   CmiState cs = CmiGetState();
815   SMSG_LIST *msg_tmp;
816   CmiUInt2  rank, node;
817      
818   if(destPE == cs->pe) {
819     char *dupmsg = (char *) CmiAlloc(size);
820     memcpy(dupmsg, msg, size);
821     CmiSendSelf(dupmsg);
822     return 0;
823   }
824   CQdCreate(CpvAccess(cQdState), 1);
825 #if CMK_SMP
826   node = CmiNodeOf(destPE);
827   rank = CmiRankOf(destPE);
828   if (node == CmiMyNode())  {
829     CmiPushPE(rank, msg);
830     return 0;
831   }
832   CMI_DEST_RANK(msg) = rank;
833   EnqueueMsg(msg, size, node);
834   return 0;
835 #else
836   /* non smp */
837   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
838   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
839   msg_tmp->msg = msg;
840   msg_tmp->next = 0;
841   while (MsgQueueLen > request_max) {
842         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
843         CmiReleaseSentMessages();
844         PumpMsgs();
845   }
846   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
847   CMI_SET_CHECKSUM(msg, size);
848   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req))) 
849     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
850   MsgQueueLen++;
851   if(sent_msgs==0)
852     sent_msgs = msg_tmp;
853   else
854     end_sent->next = msg_tmp;
855   end_sent = msg_tmp;
856   return (CmiCommHandle) &(msg_tmp->req);
857 #endif
858 #endif
859 }
860
861 void CmiFreeSendFn(int destPE, int size, char *msg)
862 {
863   CmiState cs = CmiGetState();
864   CMI_SET_BROADCAST_ROOT(msg, 0);
865
866   if (cs->pe==destPE) {
867     CmiSendSelf(msg);
868   } else {
869     CmiAsyncSendFn(destPE, size, msg);
870   }
871 }
872
873 /*********************** BROADCAST FUNCTIONS **********************/
874
875 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
876 void CmiSyncSendFn1(int destPE, int size, char *msg)
877 {
878   CmiState cs = CmiGetState();
879   char *dupmsg = (char *) CmiAlloc(size);
880   memcpy(dupmsg, msg, size);
881   if (cs->pe==destPE)
882     CmiSendSelf(dupmsg);
883   else
884     CmiAsyncSendFn(destPE, size, dupmsg);
885 }
886
887 /* send msg to its spanning children in broadcast. G. Zheng */
888 void SendSpanningChildren(int size, char *msg)
889 {
890   CmiState cs = CmiGetState();
891   int startpe = CMI_BROADCAST_ROOT(msg)-1;
892   int i;
893
894   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
895
896   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
897     int p = cs->pe-startpe;
898     if (p<0) p+=_Cmi_numpes;
899     p = BROADCAST_SPANNING_FACTOR*p + i;
900     if (p > _Cmi_numpes - 1) break;
901     p += startpe;
902     p = p%_Cmi_numpes;
903     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
904     CmiSyncSendFn1(p, size, msg);
905   }
906 }
907
908 #include <math.h>
909
910 /* send msg along the hypercube in broadcast. (Sameer) */
911 void SendHypercube(int size, char *msg)
912 {
913   CmiState cs = CmiGetState();
914   int curcycle = CMI_GET_CYCLE(msg);
915   int i;
916
917   double logp = CmiNumPes();
918   logp = log(logp)/log(2.0);
919   logp = ceil(logp);
920   
921   /*  CmiPrintf("In hypercube\n"); */
922
923   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
924
925   for (i = curcycle; i < logp; i++) {
926     int p = cs->pe ^ (1 << i);
927     
928     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
929
930     if(p < CmiNumPes()) {
931       CMI_SET_CYCLE(msg, i + 1);
932       CmiSyncSendFn1(p, size, msg);
933     }
934   }
935 }
936
937 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
938 {
939   CmiState cs = CmiGetState();
940 #if CMK_BROADCAST_SPANNING_TREE
941   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
942   SendSpanningChildren(size, msg);
943   
944 #elif CMK_BROADCAST_HYPERCUBE
945   CMI_SET_CYCLE(msg, 0);
946   SendHypercube(size, msg);
947     
948 #else
949   int i;
950
951   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
952     CmiSyncSendFn(i, size,msg) ;
953   for ( i=0; i<cs->pe; i++ ) 
954     CmiSyncSendFn(i, size,msg) ;
955 #endif
956
957   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
958 }
959
960
961 /*  FIXME: luckily async is never used  G. Zheng */
962 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)  
963 {
964 #if 0
965   CmiState cs = CmiGetState();
966   int i ;
967
968   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
969     CmiAsyncSendFn(i,size,msg) ;
970   for ( i=0; i<cs->pe; i++ ) 
971     CmiAsyncSendFn(i,size,msg) ;
972
973   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
974 CmiAbort("CmiAsyncBroadcastFn should never be called");
975   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
976 #endif
977 }
978
979 void CmiFreeBroadcastFn(int size, char *msg)
980 {
981    CmiSyncBroadcastFn(size,msg);
982    CmiFree(msg);
983 }
984  
985 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
986 {
987  
988 #if CMK_BROADCAST_SPANNING_TREE
989   CmiState cs = CmiGetState();
990   CmiSyncSendFn(cs->pe, size,msg) ;
991   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
992   SendSpanningChildren(size, msg);
993
994 #elif CMK_BROADCAST_HYPERCUBE
995   CmiState cs = CmiGetState();
996   CmiSyncSendFn(cs->pe, size,msg) ;
997   CMI_SET_CYCLE(msg, 0);
998   SendHypercube(size, msg);
999
1000 #else
1001     int i ;
1002      
1003   for ( i=0; i<_Cmi_numpes; i++ ) 
1004     CmiSyncSendFn(i,size,msg) ;
1005 #endif
1006
1007   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1008 }
1009
1010 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)  
1011 {
1012 #if 0
1013   int i ;
1014
1015   for ( i=1; i<_Cmi_numpes; i++ ) 
1016     CmiAsyncSendFn(i,size,msg) ;
1017
1018   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1019     
1020   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1021 #endif
1022 }
1023
1024 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1025 {
1026 #if CMK_BROADCAST_SPANNING_TREE
1027   CmiState cs = CmiGetState();
1028   CmiSyncSendFn(cs->pe, size,msg) ;
1029   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1030   SendSpanningChildren(size, msg);
1031
1032 #elif CMK_BROADCAST_HYPERCUBE
1033   CmiState cs = CmiGetState();
1034   CmiSyncSendFn(cs->pe, size,msg) ;
1035   CMI_SET_CYCLE(msg, 0);
1036   SendHypercube(size, msg);
1037
1038 #else
1039   int i ;
1040      
1041   for ( i=0; i<_Cmi_numpes; i++ ) 
1042     CmiSyncSendFn(i,size,msg) ;
1043 #endif
1044   CmiFree(msg) ;
1045   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1046 }
1047
1048 #if CMK_NODE_QUEUE_AVAILABLE
1049
1050 static void CmiSendNodeSelf(char *msg)
1051 {
1052 #if CMK_IMMEDIATE_MSG
1053     if (CmiIsImmediate(msg) && !_immRunning) {
1054       /*CmiHandleImmediateMessage(msg); */
1055       CmiPushImmediateMsg(msg);
1056       CmiHandleImmediate();
1057       return;
1058     }
1059 #endif
1060     CQdCreate(CpvAccess(cQdState), 1);
1061     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1062     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1063     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1064 }
1065
1066 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1067 {
1068   int i;
1069   SMSG_LIST *msg_tmp;
1070   char *dupmsg;
1071      
1072   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1073   switch (dstNode) {
1074   case NODE_BROADCAST_ALL:
1075     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1076   case NODE_BROADCAST_OTHERS:
1077     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1078     for (i=0; i<_Cmi_numnodes; i++)
1079       if (i!=_Cmi_mynode) {
1080         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1081       }
1082     break;
1083   default:
1084     dupmsg = (char *)CmiCopyMsg(msg,size);
1085     if(dstNode == _Cmi_mynode) {
1086       CmiSendNodeSelf(dupmsg);
1087     }
1088     else {
1089       CQdCreate(CpvAccess(cQdState), 1);
1090       EnqueueMsg(dupmsg, size, dstNode);
1091     }
1092   }
1093   return 0;
1094 }
1095
1096 void CmiSyncNodeSendFn(int p, int s, char *m)
1097 {
1098   CmiAsyncNodeSendFn(p, s, m);
1099 }
1100
1101 /* need */
1102 void CmiFreeNodeSendFn(int p, int s, char *m)
1103 {
1104   CmiAsyncNodeSendFn(p, s, m);
1105   CmiFree(m);
1106 }
1107
1108 /* need */
1109 void CmiSyncNodeBroadcastFn(int s, char *m)
1110 {
1111   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1112 }
1113
1114 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1115 {
1116 }
1117
1118 /* need */
1119 void CmiFreeNodeBroadcastFn(int s, char *m)
1120 {
1121   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1122   CmiFree(m);
1123 }
1124
1125 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1126 {
1127   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1128 }
1129
1130 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1131 {
1132   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1133 }
1134
1135 /* need */
1136 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1137 {
1138   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1139   CmiFree(m);
1140 }
1141 #endif
1142
1143 /************************** MAIN ***********************************/
1144 #define MPI_REQUEST_MAX 16      //1024*10 
1145
1146 void ConverseExit(void)
1147 {
1148 #if 0
1149 #if ! CMK_SMP
1150   while(!CmiAllAsyncMsgsSent()) {
1151     PumpMsgs();
1152     CmiReleaseSentMessages();
1153   }
1154   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD)) 
1155     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1156   ConverseCommonExit();
1157   MPI_Finalize();
1158 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1159   if (CmiMyPe() == 0){
1160     CmiPrintf("End of program\n");
1161   }
1162 #endif
1163   exit(0);
1164 #else
1165   /* SMP version, communication thread will exit */
1166   ConverseCommonExit();
1167   /* atomic increment */
1168   CmiCommLock();
1169   inexit++;
1170   CmiCommUnlock();
1171   while (1) CmiYield();
1172 #endif
1173 #endif
1174 }
1175
1176 static char     **Cmi_argv;
1177 static char     **Cmi_argvcopy;
1178 static CmiStartFn Cmi_startfn;   /* The start function */
1179 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1180
1181 typedef struct {
1182   int sleepMs; /*Milliseconds to sleep while idle*/
1183   int nIdles; /*Number of times we've been idle in a row*/
1184   CmiState cs; /*Machine state*/
1185 } CmiIdleState;
1186
1187 static CmiIdleState *CmiNotifyGetState(void)
1188 {
1189   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1190   s->sleepMs=0;
1191   s->nIdles=0;
1192   s->cs=CmiGetState();
1193   return s;
1194 }
1195
1196 static void CmiNotifyBeginIdle(CmiIdleState *s)
1197 {
1198   s->sleepMs=0;
1199   s->nIdles=0;
1200 }
1201     
1202 static void CmiNotifyStillIdle(CmiIdleState *s)
1203
1204 #if ! CMK_SMP
1205   CmiReleaseSentMessages();
1206   PumpMsgs();
1207 #else
1208 /*  CmiYield();  */
1209 #endif
1210
1211 #if 1
1212   {
1213   int nSpins=20; /*Number of times to spin before sleeping*/
1214   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1215   s->nIdles++;
1216   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1217     s->sleepMs+=2;
1218     if (s->sleepMs>10) s->sleepMs=10;
1219   }
1220   /*Comm. thread will listen on sockets-- just sleep*/
1221   if (s->sleepMs>0) {
1222     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1223     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1224     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1225   }       
1226   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1227   }
1228 #endif
1229 }
1230
1231 #if MACHINE_DEBUG_LOG
1232 FILE *debugLog = NULL;
1233 #endif
1234
1235 static void ConverseRunPE(int everReturn)
1236 {
1237   CmiIdleState *s=CmiNotifyGetState();
1238   CmiState cs;
1239   char** CmiMyArgv;
1240   CmiNodeAllBarrier();
1241   cs = CmiGetState();
1242   CpvInitialize(void *,CmiLocalQueue);
1243   CpvAccess(CmiLocalQueue) = cs->localqueue;
1244
1245   if (CmiMyRank())
1246     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1247   else   
1248     CmiMyArgv=Cmi_argv;
1249     
1250   CthInit(CmiMyArgv);
1251
1252   ConverseCommonInit(CmiMyArgv);
1253
1254   /* initialize the network progress counter*/
1255   /* Network progress function is used to poll the network when for
1256      messages. This flushes receive buffers on some  implementations*/
1257   CpvInitialize(int , networkProgressCount);
1258   CpvAccess(networkProgressCount) = 0; 
1259
1260 #if CMK_SMP
1261   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1262   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1263 #else
1264   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1265 #endif
1266
1267 #if MACHINE_DEBUG_LOG
1268   if (CmiMyRank() == 0) {
1269     char ln[200];
1270     sprintf(ln,"debugLog.%d",CmiMyNode());
1271     debugLog=fopen(ln,"w");
1272   }
1273 #endif
1274
1275   /* Converse initialization finishes, immediate messages can be processed.
1276      node barrier previously should take care of the node synchronization */
1277   _immediateReady = 1;
1278
1279   /* communication thread */
1280   if (CmiMyRank() == CmiMyNodeSize()) {
1281     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1282     while (1) CommunicationServerThread(5);
1283   }
1284   else {  /* worker thread */
1285   if (!everReturn) {
1286     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1287     if (Cmi_usrsched==0) CsdScheduler(-1);
1288     ConverseExit();
1289   }
1290   }
1291 }
1292
1293 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1294 {
1295   ptl_process_id_t   my_id;
1296   cnos_nidpid_map_t *nidpid;
1297   int                max_ifaces;
1298   int n,i;
1299
1300 #if MACHINE_DEBUG
1301   debugLog=NULL;
1302 #endif
1303 #if CMK_USE_HP_MAIN_FIX
1304 #if FOR_CPLUS
1305   _main(argc,argv);
1306 #endif
1307 #endif
1308   
1309   _Cmi_numnodes = cnos_get_size();
1310
1311   _Cmi_mynode = cnos_get_rank();
1312
1313   (void)cnos_get_nidpid_map( &nidpid );
1314
1315   if ( PtlInit( &max_ifaces ) != PTL_OK ) {
1316     fprintf(stderr,"%d: PtlInit() failed\n", CmiMyNode());
1317     return;
1318   }
1319
1320   if ( PtlGetId( _ni_handle, &my_id ) != PTL_OK ) {
1321     fprintf(stderr,"%d: PtlGetId() failed\n", CmiMyNode());
1322     return;
1323   }
1324
1325   if ( (my_id.nid != nidpid[CmiMyNode()].nid) ||
1326        (my_id.pid != nidpid[CmiMyNode()].pid) ) {
1327     fprintf(stderr,"%d: Error: invalid nid/pid map\n", CmiMyNode());
1328     return;
1329   }
1330   mynid = my_id.nid;
1331   mypid = my_id.pid;
1332   printf("%d: nidpid is (%d, %d)\n", CmiMyNode(), nidpid[CmiMyNode()].nid, nidpid[CmiMyNode()].pid);
1333
1334
1335   /* processor per node */
1336   _Cmi_mynodesize = 1;
1337   CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
1338 #if ! CMK_SMP
1339   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0) 
1340     CmiAbort("+ppn cannot be used in non SMP version!\n");
1341 #endif
1342   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1343   if (idleblock && _Cmi_mynode == 0) {
1344     CmiPrintf("Charm++: Running in idle blocking mode.\n");
1345   }
1346
1347 #if CMK_NO_OUTSTANDING_SENDS
1348   no_outstanding_sends=1;
1349 #endif
1350   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1351     no_outstanding_sends = 1;
1352     if (_Cmi_mynode == 0)
1353       CmiPrintf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1354         no_outstanding_sends?"":" not");
1355   }
1356   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1357   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1358   Cmi_argvcopy = CmiCopyArgs(argv);
1359   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1360   /* find dim = log2(numpes), to pretend we are a hypercube */
1361   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1362     Cmi_dim++ ;
1363  /* CmiSpanTreeInit();*/
1364   request_max=MAX_QLEN;
1365   CmiGetArgInt(argv,"+requestmax",&request_max);
1366   /*printf("request max=%d\n", request_max);*/
1367
1368   /* checksum flag */
1369   if (CmiGetArgFlag(argv,"+checksum")) {
1370 #if !CMK_OPTIMIZE
1371     checksum_flag = 1;
1372     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1373 #else
1374     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1375 #endif
1376   }
1377
1378   if (CmiGetArgFlag(argv,"++debug"))
1379   {   /*Pause so user has a chance to start and attach debugger*/
1380     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
1381     if (!CmiGetArgFlag(argv,"++debug-no-pause"))
1382       sleep(10);
1383   }
1384
1385   /* CmiTimerInit(); */
1386
1387 #if 0
1388   CthInit(argv);
1389   ConverseCommonInit(argv);
1390   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
1391   if (initret==0) {
1392     fn(CmiGetArgc(argv), argv);
1393     if (usched==0) CsdScheduler(-1);
1394     ConverseExit();
1395   }
1396 #endif
1397
1398   CsvInitialize(CmiNodeState, NodeState);
1399   CmiNodeStateInit(&CsvAccess(NodeState));
1400
1401   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1402
1403   for (i=0; i<_Cmi_mynodesize+1; i++) {
1404     /*    procState[i].sendMsgBuf = PCQueueCreate();   */
1405     procState[i].recvLock = CmiCreateLock();
1406   }
1407 #if CMK_SMP
1408   sendMsgBuf = PCQueueCreate();
1409   sendMsgBufLock = CmiCreateLock();
1410 #endif
1411
1412   /* Network progress function is used to poll the network when for
1413      messages. This flushes receive buffers on some  implementations*/
1414   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
1415   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
1416
1417   CmiStartThreads(argv);
1418   ConverseRunPE(initret);
1419 }
1420
1421 /***********************************************************************
1422  *
1423  * Abort function:
1424  *
1425  ************************************************************************/
1426
1427 void CmiAbort(const char *message)
1428 {
1429   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1430         "Reason: %s\n",CmiMyPe(),message);
1431  /*  CmiError(message); */
1432   CmiPrintStackTrace(0);
1433   /*MPI_Abort(MPI_COMM_WORLD, 1);*/
1434 }
1435
1436
1437 /*@}*/