Changed the interOperate to a global variable so that it is initialized from
[charm.git] / src / arch / vmi / machine.c
1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
3 **
4 ** This code does not work correctly with glibc 2.2.92 or lower due to
5 ** problems with Converse threads interacting with VMI's use of pthreads.
6 ** To check which version is on a system:
7 **
8 **    nm -a /lib/libc.so.6 | grep 'A GLIBC_'
9 */
10
11 /** @file
12  * VMI machine layer
13  * @ingroup Machine
14  * @{
15  */
16
17 #include "machine.h"
18
19 //#define GK_DELAY_DEVICE 1
20 //#define CMI_VMI_USE_VMI22 1
21
22 /* The following are external variables used by the VMI core. */
23 extern USHORT VMI_DEVICE_RUNTIME;
24 extern PVMI_NETADDRESS localAddress;
25 extern VMIStreamRecv recvFn;
26
27 /* The following are variables and functions used by the Converse core. */
28 int _Cmi_numpes;
29 int _Cmi_mype;
30 int _Cmi_myrank = 0;
31
32 CpvDeclare (void *, CmiLocalQueue);
33 CpvDeclare (void *, CMI_VMI_RemoteQueue);
34
35 extern void CthInit (char **argv);
36 extern void ConverseCommonInit (char **argv);
37
38 /* Global variables. */
39 char *CMI_VMI_Username;
40 char *CMI_VMI_Program_Key;
41 int CMI_VMI_Startup_Type;
42 int CMI_VMI_WAN_Latency;
43 int CMI_VMI_Cluster;
44 int CMI_VMI_Probe_Clusters;
45 int CMI_VMI_Memory_Pool;
46 int CMI_VMI_Terminate_VMI_Hack;
47 int CMI_VMI_Connection_Timeout;
48 int CMI_VMI_Maximum_Handles;
49 int CMI_VMI_Small_Message_Boundary;
50 int CMI_VMI_Medium_Message_Boundary;
51 int CMI_VMI_Eager_Protocol;
52 int CMI_VMI_Eager_Interval;
53 int CMI_VMI_Eager_Threshold;
54 int CMI_VMI_Eager_Short_Pollset_Size_Maximum;
55 int CMI_VMI_Eager_Short_Slots;
56 int CMI_VMI_Eager_Short_Message_Boundary;
57 int CMI_VMI_Eager_Long_Buffers;
58 int CMI_VMI_Eager_Long_Buffer_Size;
59
60 volatile int CMI_VMI_Message_Receive_Count;
61 volatile int CMI_VMI_AsyncMsgCount;
62 volatile int CMI_VMI_Barrier_Count;
63
64 int CMI_VMI_Charmrun_Socket;
65 char CMI_VMI_Charmrun_IP[1024];
66 int CMI_VMI_Charmrun_Port;
67
68 int CMI_VMI_CRM_Socket;
69 char *CMI_VMI_CRM_Hostname;
70 int CMI_VMI_CRM_Port;
71
72 CMI_VMI_Process_T *CMI_VMI_Processes;
73 CMI_VMI_Process_T **CMI_VMI_Eager_Short_Pollset;
74 int CMI_VMI_Eager_Short_Pollset_Size;
75
76 CMI_VMI_Handle_T *CMI_VMI_Handles;
77 int CMI_VMI_Next_Handle;
78
79 int CMI_VMI_Latency_Vectors_Received;
80 BOOLEAN CMI_VMI_Cluster_Mapping_Received;
81
82 PVMI_BUFFER_POOL CMI_VMI_Bucket1_Pool;
83 PVMI_BUFFER_POOL CMI_VMI_Bucket2_Pool;
84 PVMI_BUFFER_POOL CMI_VMI_Bucket3_Pool;
85 PVMI_BUFFER_POOL CMI_VMI_Bucket4_Pool;
86 PVMI_BUFFER_POOL CMI_VMI_Bucket5_Pool;
87
88 #if CMK_GRID_QUEUE_AVAILABLE
89 CMI_VMI_Grid_Object_T *CMI_VMI_Grid_Objects;
90 int CMI_VMI_Grid_Objects_Index;
91 int CMI_VMI_Grid_Queue;
92 int CMI_VMI_Grid_Queue_Maximum;
93 int CMI_VMI_Grid_Queue_Interval;
94 int CMI_VMI_Grid_Queue_Threshold;
95 #endif
96
97 #ifdef GK_DELAY_DEVICE
98 typedef struct
99 {
100   double  time;
101   char   *msg;
102   int     msgsize;
103   int     sender;
104   void   *next;
105 } gk_delayed_msgs;
106
107 double gk_timeout1;
108 gk_delayed_msgs *gk_head_ptr1;
109 gk_delayed_msgs *gk_tail_ptr1;
110
111 double gk_timeout2;
112 gk_delayed_msgs *gk_head_ptr2;
113 gk_delayed_msgs *gk_tail_ptr2;
114 #endif
115
116
117 /**************************************************************************
118 ** This function is the entry point for all Converse and Charm++ codes.
119 ** 
120 ** argc
121 ** argv
122 ** start_function - the user-supplied function to run (function pointer)
123 ** user_calls_scheduler - boolean for whether ConverseInit() should invoke
124 **                        the scheduler or whether user code will do it
125 ** init_returns - boolean for whether ConverseInit() returns
126 */
127 void ConverseInit (int argc, char **argv, CmiStartFn start_function, int user_calls_scheduler, int init_returns)
128 {
129   int rc;
130   int i;
131   int j;
132
133
134   DEBUG_PRINT ("ConverseInit() called.\n");
135
136   /* Get a default program key from argv[0]. */
137   if (!(CMI_VMI_Program_Key = strdup (argv[0]))) {
138     CmiAbort ("Unable to allocate memory for the program key.");
139   }
140
141   /* Initialize global variables. */
142   CMI_VMI_Startup_Type                     = CMI_VMI_STARTUP_TYPE_UNKNOWN;
143   CMI_VMI_WAN_Latency                      = CMI_VMI_WAN_LATENCY;
144   CMI_VMI_Cluster                          = CMI_VMI_CLUSTER_UNKNOWN;
145   CMI_VMI_Probe_Clusters                   = CMI_VMI_PROBE_CLUSTERS;
146 #if CMK_GRID_QUEUE_AVAILABLE
147   CMI_VMI_Grid_Queue                       = CMI_VMI_GRID_QUEUE;
148   CMI_VMI_Grid_Queue_Maximum               = CMI_VMI_GRID_QUEUE_MAXIMUM;
149   CMI_VMI_Grid_Queue_Interval              = CMI_VMI_GRID_QUEUE_INTERVAL;
150   CMI_VMI_Grid_Queue_Threshold             = CMI_VMI_GRID_QUEUE_THRESHOLD;
151 #endif
152   CMI_VMI_Memory_Pool                      = CMI_VMI_MEMORY_POOL;
153   CMI_VMI_Terminate_VMI_Hack               = CMI_VMI_TERMINATE_VMI_HACK;
154   CMI_VMI_Connection_Timeout               = CMI_VMI_CONNECTION_TIMEOUT;
155   CMI_VMI_Maximum_Handles                  = CMI_VMI_MAXIMUM_HANDLES;
156   CMI_VMI_Small_Message_Boundary           = CMI_VMI_SMALL_MESSAGE_BOUNDARY;
157   CMI_VMI_Medium_Message_Boundary          = CMI_VMI_MEDIUM_MESSAGE_BOUNDARY;
158   CMI_VMI_Eager_Protocol                   = CMI_VMI_EAGER_PROTOCOL;
159   CMI_VMI_Eager_Interval                   = CMI_VMI_EAGER_INTERVAL;
160   CMI_VMI_Eager_Threshold                  = CMI_VMI_EAGER_THRESHOLD;
161   CMI_VMI_Eager_Short_Pollset_Size_Maximum = CMI_VMI_EAGER_SHORT_POLLSET_SIZE_MAXIMUM;
162   CMI_VMI_Eager_Short_Slots                = CMI_VMI_EAGER_SHORT_SLOTS;
163   CMI_VMI_Eager_Short_Message_Boundary     = CMI_VMI_EAGER_SHORT_MESSAGE_BOUNDARY;
164   CMI_VMI_Eager_Long_Buffers               = CMI_VMI_EAGER_LONG_BUFFERS;
165   CMI_VMI_Eager_Long_Buffer_Size           = CMI_VMI_EAGER_LONG_BUFFER_SIZE;
166
167   CMI_VMI_Message_Receive_Count = 0;
168   CMI_VMI_AsyncMsgCount = 0;
169   CMI_VMI_Barrier_Count = 0;
170
171   CMI_VMI_Cluster_Mapping_Received = FALSE;
172
173   /* Read global variable values from the environment. */
174   CMI_VMI_Read_Environment ();
175
176   /* Set up the process array and initialize. */
177   CMI_VMI_Processes = (CMI_VMI_Process_T *) (malloc (_Cmi_numpes * sizeof (CMI_VMI_Process_T)));
178   if (!CMI_VMI_Processes) {
179     CmiAbort ("Unable to allocate memory for process array.");
180   }
181
182   CMI_VMI_Eager_Short_Pollset = (CMI_VMI_Process_T **) (malloc (_Cmi_numpes * sizeof (CMI_VMI_Process_T *)));
183   if (!CMI_VMI_Eager_Short_Pollset) {
184     CmiAbort ("Unable to allocate memory for eager pollset array.");
185   }
186
187   for (i = 0; i < _Cmi_numpes; i++) {
188     (&CMI_VMI_Processes[i])->connection_state = CMI_VMI_CONNECTION_DISCONNECTED;
189     (&CMI_VMI_Processes[i])->cluster = CMI_VMI_CLUSTER_UNKNOWN;
190
191     (&CMI_VMI_Processes[i])->latency_vector = NULL;
192
193     (&CMI_VMI_Processes[i])->normal_short_count;
194     (&CMI_VMI_Processes[i])->normal_long_count;
195     (&CMI_VMI_Processes[i])->eager_short_count;
196     (&CMI_VMI_Processes[i])->eager_long_count;
197
198     for (j = 0; j < CMI_VMI_Eager_Short_Slots; j++) {
199       (&CMI_VMI_Processes[i])->eager_short_send_handles[j] = NULL;
200       (&CMI_VMI_Processes[i])->eager_short_receive_handles[j] = NULL;
201     }
202
203     (&CMI_VMI_Processes[i])->eager_short_send_size = 0;
204     (&CMI_VMI_Processes[i])->eager_short_send_index = 0;
205     (&CMI_VMI_Processes[i])->eager_short_send_credits_available = 0;
206
207     (&CMI_VMI_Processes[i])->eager_short_receive_size = 0;
208     (&CMI_VMI_Processes[i])->eager_short_receive_index = 0;
209     (&CMI_VMI_Processes[i])->eager_short_receive_dirty = 0;
210     (&CMI_VMI_Processes[i])->eager_short_receive_credits_replentish = 0;
211
212     CMI_VMI_Eager_Short_Pollset[i] = (CMI_VMI_Process_T *) NULL;
213
214     for (j = 0; j < CMI_VMI_Eager_Long_Buffers; j++) {
215       (&CMI_VMI_Processes[i])->eager_long_send_handles[j] = NULL;
216       (&CMI_VMI_Processes[i])->eager_long_receive_handles[j] = NULL;
217     }
218
219     (&CMI_VMI_Processes[i])->eager_long_send_size = 0;
220     (&CMI_VMI_Processes[i])->eager_long_receive_size = 0;
221   }
222
223   CMI_VMI_Eager_Short_Pollset_Size = 0;
224
225   /* Set up the send/receive handle array and initialize. */
226   CMI_VMI_Handles = (CMI_VMI_Handle_T *) (malloc (CMI_VMI_Maximum_Handles * sizeof (CMI_VMI_Handle_T)));
227   if (!CMI_VMI_Handles) {
228     CmiAbort ("Unable to allocate memory for handle array.");
229   }
230
231   for (i = 0; i < CMI_VMI_Maximum_Handles; i++) {
232     (&CMI_VMI_Handles[i])->index = i;
233     (&CMI_VMI_Handles[i])->refcount = 0;
234   }
235
236   CMI_VMI_Next_Handle = 0;
237
238   /* Print out debug information if compiled with debug support. */
239   DEBUG_PRINT ("The program key is %s.\n", key);
240   DEBUG_PRINT ("The startup type is %d.\n", CMI_VMI_Startup_Type);
241
242   /* Start up via the startup type selected. */
243   switch (CMI_VMI_Startup_Type)
244   {
245     case CMI_VMI_STARTUP_TYPE_CRM:
246       rc = CMI_VMI_Startup_CRM ();
247       break;
248
249     case CMI_VMI_STARTUP_TYPE_CHARMRUN:
250       rc = CMI_VMI_Startup_Charmrun ();
251       break;
252
253     default:
254       CmiAbort ("An unknown startup type was specified.");
255       break;
256   }
257
258   if (rc < 0) {
259     CmiAbort ("There was a fatal error during the startup phase.");
260   }
261
262 #ifdef GK_DELAY_DEVICE
263   {
264     char *value;
265     unsigned long timeout_us;
266
267     if (value = getenv ("GK_GET_LATENCY")) {
268       timeout_us = (unsigned long) atoi (value);
269       gk_timeout1 = ((double) timeout_us) * 0.000001;
270       gk_timeout2 = ((double) timeout_us) * 0.000001 * 2;
271       if (_Cmi_mype == 0) {
272         CmiPrintf ("*** Charm is using artificial latency of %f and artificial Get latency of %f\n", gk_timeout1, gk_timeout2);
273       }
274     } else {
275       CmiAbort ("Charm built with GK_DELAY_DEVICE but GK_GET_LATENCY not in environment.");
276     }
277
278     gk_head_ptr1 = NULL;
279     gk_tail_ptr1 = NULL;
280
281     gk_head_ptr2 = NULL;
282     gk_tail_ptr2 = NULL;
283   }
284 #endif
285
286   /* Initialize VMI. */
287   rc = CMI_VMI_Initialize_VMI ();
288
289   if (rc < 0) {
290     CmiAbort ("There was a fatal error during VMI initialization.");
291   }
292
293   DEBUG_PRINT ("VMI was initialized successfully.\n");
294
295   /*
296     Create the FIFOs for holding local and remote messages.
297
298     NOTE: FIFO creation must happen at this point due to a race condition
299           where some processes may open their connections and start sending
300           messages before all of the other processes are started, and we
301           must be able to deal with this situation.
302   */
303   CpvAccess (CmiLocalQueue) = CdsFifo_Create ();
304   CpvAccess (CMI_VMI_RemoteQueue) = CdsFifo_Create ();
305
306   /* Open connections. */
307   rc = CMI_VMI_Open_Connections ();
308
309   if (rc < 0) {
310     CmiAbort ("Fatal error during connection setup phase.");
311   }
312
313   /*
314     Probe the cluster mapping by requesting all-to-all latencies (if requested).
315
316     NOTE: This must start with a CmiBarrier() because some processes may not have
317     completed opening connections, so probing latencies on these connections will
318     result in a segfault.
319   */
320   if (CMI_VMI_Probe_Clusters) {
321     if (_Cmi_mype == 0) {
322       CmiBarrier ();
323       CmiProbeLatencies ();
324       CMI_VMI_Compute_Cluster_Mapping ();
325       CMI_VMI_Distribute_Cluster_Mapping ();
326     } else {
327       CmiBarrier ();
328       CMI_VMI_Wait_Cluster_Mapping ();
329     }
330   }
331
332 #if CMK_GRID_QUEUE_AVAILABLE
333   CMI_VMI_Grid_Objects = (CMI_VMI_Grid_Object_T *) malloc (CMI_VMI_Grid_Queue_Maximum * sizeof (CMI_VMI_Grid_Object_T));
334   if (!CMI_VMI_Grid_Objects) {
335     CmiAbort ("Unable to allocate memory for Grid objects array.\n");
336   }
337   CMI_VMI_Grid_Objects_Index = 0;
338 #endif
339
340   DEBUG_PRINT ("ConverseInit() is starting the main processing loop.\n");
341
342   /* Initialize Converse and start the main processing loop. */
343   CthInit (argv);
344   ConverseCommonInit (argv);
345
346   /* Set up CmiNotifyIdle() to be called when processor goes idle. */
347   CcdCallOnConditionKeep (CcdPROCESSOR_STILL_IDLE, (CcdVoidFn) CmiNotifyIdle, NULL);
348
349   if (!init_returns) {
350     start_function (CmiGetArgc (argv), argv);
351     if (!user_calls_scheduler) {
352       CsdScheduler (-1);
353     }
354     ConverseExit ();
355   }
356 }
357
358
359
360 /**************************************************************************
361 ** This function is the exit point for all Converse and Charm++ codes.
362 */
363 void ConverseExit ()
364 {
365   VMI_STATUS status;
366
367   int i;
368
369
370   DEBUG_PRINT ("ConverseExit() called.\n");
371
372   /* ConverseCommonExit() shuts down CCS and closes Projections logs. */
373   ConverseCommonExit ();
374
375   /* Barrier to ensure that all processes are ready to exit. */
376   CmiBarrier ();
377
378   /* Call VMI_Poll() several times to drain the network. */
379   for (i = 0; i < 1000000; i++) {
380     status = VMI_Poll ();
381     CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
382   }
383
384   /*
385     Signal the charmrun terminal that the computation has ended (if necessary).
386
387     NOTE: Get rid of charmrun as early in ConverseExit() as possible in case
388     something bad happens below.
389   */
390   if (CMI_VMI_Startup_Type == CMI_VMI_STARTUP_TYPE_CHARMRUN) {
391     CMI_VMI_Charmrun_Message_Header_T hdr;
392     int rc;
393     char dummy[10];
394
395     hdr.msg_len = htonl (0);
396     strcpy (hdr.msg_type, "ending");
397
398     rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, (const void *) &hdr, (int) sizeof (CMI_VMI_Charmrun_Message_Header_T));
399     if (rc < 0) {
400       DEBUG_PRINT ("Error sending to charmrun.\n");
401     }
402
403     /*
404       Reading from the charmrun socket acts as a sort of "barrier" because the call
405       blocks until all processes signal ending and charmrun closes all sockets.  This
406       is important because it ensures that no processes exit until all processes signal
407       charmrun that they are ready to terminate.  Failing to do this causes charmrun
408       to print an error message if any processes exit before all processes signal that
409       they are terminating.
410
411       NOTE: This must be a read() to work correctly!
412     */
413     read (CMI_VMI_Charmrun_Socket, dummy, 1);
414   }
415
416   /* If a clean VMI termination is requested, do it. */
417   if (!CMI_VMI_Terminate_VMI_Hack) {
418     /* Close all VMI connections. */
419     CMI_VMI_Close_Connections ();
420
421     /* Call VMI_Poll() several times to drain the network. */
422     for (i = 0; i < 1000000; i++) {
423       status = VMI_Poll ();
424       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
425     }
426
427     /* Terminate VMI cleanly. */
428     CMI_VMI_Terminate_VMI ();
429
430     /* Free resources. */
431 #if CMK_GRID_QUEUE_AVAILABLE
432     free (CMI_VMI_Grid_Objects);
433 #endif
434     CdsFifo_Destroy (CpvAccess (CMI_VMI_RemoteQueue));
435     CdsFifo_Destroy (CpvAccess (CmiLocalQueue));
436
437     for (i = 0; i < _Cmi_numpes; i++) {
438       if ((&CMI_VMI_Processes[i])->latency_vector) {
439         free ((&CMI_VMI_Processes[i])->latency_vector);
440       }
441     }
442
443     free (CMI_VMI_Handles);
444     free (CMI_VMI_Eager_Short_Pollset);
445     free (CMI_VMI_Processes);
446     free (CMI_VMI_Program_Key);
447     free (CMI_VMI_Username);
448   }
449
450   /* Must call exit() in order to actually terminate the program. */
451   exit (0);
452 }
453
454
455
456 /**************************************************************************
457 **
458 */
459 void CmiAbort (const char *message)
460 {
461   DEBUG_PRINT ("CmiAbort() called.\n");
462
463   printf ("%s\n", message);
464   exit (1);
465 }
466
467
468
469 /**************************************************************************
470 **
471 */
472 void CmiNotifyIdle ()
473 {
474   VMI_STATUS status;
475
476   CMI_VMI_Process_T *process;
477   CMI_VMI_Handle_T *handle;
478
479   int index;
480   char *msg;
481   CMI_VMI_Eager_Short_Slot_Footer_T *footer;
482   int credits_temp;
483
484   CMI_VMI_Credit_Message_T credit_msg;
485   PVOID addrs[1];
486   ULONG sz[1];
487
488   int i;
489
490
491   DEBUG_PRINT ("CmiNotifyIdle() called.\n");
492
493   if (CMI_VMI_Eager_Protocol) {
494     /*
495       Check to see if any processes have a large number of outstanding eager short credits.
496
497       Normally, eager short credits are replentished on the sender when we send a message
498       to it.  If we do not communicate frequently with the sender, this does not happen
499       automatically and we need to explicitly send credit updates.
500     */
501     for (i = 0; i < _Cmi_numpes; i++) {
502       process = &CMI_VMI_Processes[i];
503
504       if (process->eager_short_receive_credits_replentish >= (0.75 * CMI_VMI_Eager_Short_Slots)) {
505         CMI_VMI_MESSAGE_TYPE (&credit_msg) = CMI_VMI_MESSAGE_TYPE_CREDIT;
506         CMI_VMI_MESSAGE_CREDITS (&credit_msg) = process->eager_short_receive_credits_replentish;
507
508 #if CMK_BROADCAST_SPANNING_TREE
509         CMI_SET_BROADCAST_ROOT (&credit_msg, 0);
510 #endif
511
512         addrs[0] = (PVOID) &credit_msg;
513         sz[0] = (ULONG) (sizeof (CMI_VMI_Credit_Message_T));
514
515         status = VMI_Stream_Send_Inline (process->connection, addrs, sz, 1, sizeof (CMI_VMI_Credit_Message_T));
516         CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
517
518         process->eager_short_receive_credits_replentish = 0;
519       }
520     }
521
522     /*
523       Check to see if any processes are communicating with us frequently.
524       These processes are candidates for eager communications.
525     */
526     if (CMI_VMI_Message_Receive_Count >= CMI_VMI_Eager_Interval) {
527       for (i = 0; i < _Cmi_numpes; i++) {
528         if ((CMI_VMI_Eager_Short_Pollset_Size < CMI_VMI_Eager_Short_Pollset_Size_Maximum) &&
529             ((&CMI_VMI_Processes[i])->normal_short_count >= CMI_VMI_Eager_Threshold) &&
530             ((&CMI_VMI_Processes[i])->eager_short_receive_size == 0) &&
531             (VMI_CONNECT_ONE_WAY_LATENCY ((&CMI_VMI_Processes[i])->connection) < CMI_VMI_WAN_Latency)) {
532           CMI_VMI_Eager_Short_Setup (i);
533         }
534
535         if (((&CMI_VMI_Processes[i])->normal_long_count >= CMI_VMI_Eager_Threshold) &&
536             ((&CMI_VMI_Processes[i])->eager_long_receive_size == 0)) {
537           CMI_VMI_Eager_Long_Setup (i, CMI_VMI_Eager_Long_Buffer_Size);
538         }
539
540         (&CMI_VMI_Processes[i])->normal_short_count = 0;
541         (&CMI_VMI_Processes[i])->normal_long_count = 0;
542         (&CMI_VMI_Processes[i])->eager_short_count = 0;
543         (&CMI_VMI_Processes[i])->eager_long_count = 0;
544       }
545
546       CMI_VMI_Message_Receive_Count = 0;
547     }
548   }
549
550   /* Pump the message loop. */
551   status = VMI_Poll ();
552   CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
553 }
554
555
556
557 /**************************************************************************
558 **
559 */
560 void CmiMemLock ()
561 {
562   DEBUG_PRINT ("CmiMemLock() called.\n");
563
564   /* Empty. */
565 }
566
567
568
569 /**************************************************************************
570 **
571 */
572 void CmiMemUnlock ()
573 {
574   DEBUG_PRINT ("CmiMemUnlock() called.\n");
575
576   /* Empty. */
577 }
578
579
580
581 /**************************************************************************
582 ** This is our implementation of CmiPrintf().  For the case where the code
583 ** was started by charmrun, we must use the charmrun protocol to send
584 ** output back to the charmrun terminal.  Otherwise, we can simply send
585 ** output to the program's stdout (which is automatically redirected to
586 ** a socket that is attached to the right place).
587 **
588 ** NOTE: When sending to the charmrun terminal, an explicit NULL must be
589 ** included at the end of the message buffer.  The charmrun terminal reuses
590 ** its buffer, so if a terminating NULL is not sent, the tail of any
591 ** previous larger-sized message is printed after the shorter message sent
592 ** here.
593 */
594 void CmiPrintf (const char *format, ...)
595 {
596   DEBUG_PRINT ("CmiPrintf() called.\n");
597
598   if (CMI_VMI_Startup_Type == CMI_VMI_STARTUP_TYPE_CHARMRUN) {
599     CMI_VMI_Charmrun_Message_Header_T hdr;
600     va_list args;
601     char *temp_str;
602     int rc;
603
604     va_start (args, format);
605     vasprintf (&temp_str, format, args);
606
607     hdr.msg_len = htonl (strlen (temp_str) + 1);
608     strcpy (hdr.msg_type, "print");
609
610     rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, (const void *) &hdr, sizeof (CMI_VMI_Charmrun_Message_Header_T));
611     if (rc < 0) {
612       DEBUG_PRINT ("Error sending to charmrun.\n");
613     }
614     rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, temp_str, ((strlen (temp_str)) + 1));
615     if (rc < 0) {
616       DEBUG_PRINT ("Error sending to charmrun.\n");
617     }
618
619     free (temp_str);
620   } else {
621     va_list args;
622     va_start (args, format);
623     vprintf (format, args);
624     fflush (stdout);
625     va_end (args);
626   }
627 }
628
629
630
631 /**************************************************************************
632 ** See comments for CmiPrintf() above.
633 */
634 void CmiError (const char *format, ...)
635 {
636   DEBUG_PRINT ("CmiError() called.\n");
637
638   if (CMI_VMI_Startup_Type == CMI_VMI_STARTUP_TYPE_CHARMRUN) {
639     CMI_VMI_Charmrun_Message_Header_T hdr;
640     va_list args;
641     char *temp_str;
642     int rc;
643
644     va_start (args, format);
645     vasprintf (&temp_str, format, args);
646
647     hdr.msg_len = htonl (strlen (temp_str) + 1);
648     strcpy (hdr.msg_type, "printerr");
649
650     rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, (const void *) &hdr, sizeof (CMI_VMI_Charmrun_Message_Header_T));
651     if (rc < 0) {
652       DEBUG_PRINT ("Error sending to charmrun.\n");
653     }
654     rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, temp_str, ((strlen (temp_str)) + 1));
655     if (rc < 0) {
656       DEBUG_PRINT ("Error sending to charmrun.\n");
657     }
658
659     free (temp_str);
660   } else {
661     va_list args;
662     va_start (args, format);
663     vfprintf (stderr, format, args);
664     fflush (stdout);
665     va_end (args);
666   }
667 }
668
669
670
671 /**************************************************************************
672 **
673 */
674 int CmiScanf (const char *format, ...)
675 {
676   int rc;
677
678
679   DEBUG_PRINT ("CmiScanf() called.\n");
680
681   if (CMI_VMI_Startup_Type == CMI_VMI_STARTUP_TYPE_CHARMRUN) {
682     CmiAbort ("CmiScanf() is not implemented for startup type Charmrun.");
683   } else {
684     va_list args;
685     va_start (args, format);
686     rc = vfscanf (stdin, format, args);
687     va_end (args);
688     return (rc);
689   }
690 }
691
692
693
694 /**************************************************************************
695 ** This is a simple barrier function, similar to the one implemented in the
696 ** net-linux-gm machine layer.  This routine assumes that there are few
697 ** messages in flight; I have not tested extensively with many outstanding
698 ** messages and there could very well be some nasty race conditions.
699 **
700 ** This routine was implemented to allow clocks to be synchronized at
701 ** program startup time, so Projections timeline views do not show message
702 ** sends that appear after the corresponding message deliveries.
703 **
704 ** THIS CODE ASSUMES THAT CMI_VMI_Barrier_Count IS INITIALIZED TO 0
705 ** DURING ConverseInit()!  We cannot initialize it in this function due
706 ** to a race condition where PE 0 might invoke CmiBarrer() much later
707 ** than other nodes in the computation.  In this case, it might have
708 ** already seen barrier messages coming in from the other nodes and
709 ** counted them in the stream receive handler prior to invoking
710 ** CmiBarrier().
711 **
712 ** TODO: This routine should use spanning trees if this machine layer has
713 ** been configured for that type of message broadcast.
714 */
715 int CmiBarrier ()
716 {
717   VMI_STATUS status;
718
719   CMI_VMI_Barrier_Message_T barrier_msg;
720   PVOID addrs[1];
721   ULONG sz[1];
722
723   int i;
724
725
726   DEBUG_PRINT ("CmiBarrier() called.\n");
727
728   /* Process 0 coordinates the barrier. */
729   if (_Cmi_mype == 0) {
730     /* Wait until all processes send us a barrier message. */
731     while (CMI_VMI_Barrier_Count < (_Cmi_numpes - 1)) {
732       status = VMI_Poll ();
733       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
734     }
735
736     /* Reset the barrier count immediately to set up next barrier operation. */
737     CMI_VMI_Barrier_Count = 0;
738
739     /* Send a barrier message to each process to signal that barrier is finished. */
740     CMI_VMI_MESSAGE_TYPE (&barrier_msg) = CMI_VMI_MESSAGE_TYPE_BARRIER;
741     CMI_VMI_MESSAGE_CREDITS (&barrier_msg) = 0;
742
743 #if CMK_BROADCAST_SPANNING_TREE
744     CMI_SET_BROADCAST_ROOT (&barrier_msg, 0);
745 #endif
746
747     addrs[0] = (PVOID) &barrier_msg;
748     sz[0] = (ULONG) (sizeof (CMI_VMI_Barrier_Message_T));
749
750     for (i = 1; i < _Cmi_numpes; i++) {
751       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, sizeof (CMI_VMI_Barrier_Message_T));
752       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
753     }
754   } else {
755     /* Send a barrier message to Process 0. */
756     CMI_VMI_MESSAGE_TYPE (&barrier_msg) = CMI_VMI_MESSAGE_TYPE_BARRIER;
757     CMI_VMI_MESSAGE_CREDITS (&barrier_msg) = 0;
758
759 #if CMK_BROADCAST_SPANNING_TREE
760     CMI_SET_BROADCAST_ROOT (&barrier_msg, 0);
761 #endif
762
763     addrs[0] = (PVOID) &barrier_msg;
764     sz[0] = (ULONG) (sizeof (CMI_VMI_Barrier_Message_T));
765
766     status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[0])->connection, addrs, sz, 1, sizeof (CMI_VMI_Barrier_Message_T));
767     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
768
769     /* Wait until Process 0 notifies us that barrier is finished. */
770     while (CMI_VMI_Barrier_Count < 1) {
771       status = VMI_Poll ();
772       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
773     }
774
775     /* Reset the barrier count immediately to set up next barrier operation. */
776     CMI_VMI_Barrier_Count = 0;
777   }
778   return 0;
779 }
780
781
782
783 /**************************************************************************
784 **
785 */
786 int CmiBarrierZero ()
787 {
788   DEBUG_PRINT ("CmiBarrierZero() called.\n");
789
790   CmiBarrier ();
791   return 0;
792 }
793
794
795
796 /**************************************************************************
797 **
798 */
799 void CmiSyncSendFn (int destrank, int msgsize, char *msg)
800 {
801   VMI_STATUS status;
802
803   char *msgcopy;
804
805   CMI_VMI_Process_T *process;
806
807   PVOID addrs[2];
808   ULONG sz[2];
809
810   CMI_VMI_Handle_T *handle;
811
812   PVMI_CACHE_ENTRY cacheentry;
813
814   CMI_VMI_Publish_Message_T publish_msg;
815
816   void *context;
817
818   int index;
819
820   PVMI_RDMA_OP rdmaop;
821
822   int offset;
823
824   CMI_VMI_Eager_Short_Slot_Footer_T footer;
825
826
827   DEBUG_PRINT ("CmiSyncSendFn() called.\n");
828
829   if (destrank == _Cmi_mype) {
830     msgcopy = CmiAlloc (msgsize);
831     memcpy (msgcopy, msg, msgsize);
832     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
833     return;
834   }
835
836   process = &CMI_VMI_Processes[destrank];
837
838 #if CMK_BROADCAST_SPANNING_TREE
839   CMI_SET_BROADCAST_ROOT (msg, 0);
840 #endif
841
842   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
843   CMI_VMI_MESSAGE_CREDITS (msg) = process->eager_short_receive_credits_replentish;
844   process->eager_short_receive_credits_replentish = 0;
845
846   // These are assigned here for the Eager long test in the second "if" case.
847   index = process->eager_long_send_size - 1;
848   handle = process->eager_long_send_handles[index];
849
850   if (CMI_VMI_Eager_Protocol && (process->eager_short_send_credits_available > 0) && (msgsize < CMI_VMI_Eager_Short_Message_Boundary)) {
851     index = process->eager_short_send_index;
852     handle = process->eager_short_send_handles[index];
853
854     memcpy (handle->msg, msg, msgsize);
855
856     footer.msgsize = msgsize;
857     footer.sentinel = CMI_VMI_EAGER_SHORT_SENTINEL_DATA;
858     memcpy (handle->msg + msgsize, &footer, sizeof (CMI_VMI_Eager_Short_Slot_Footer_T));
859
860     handle->msgsize = msgsize;
861     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
862
863     cacheentry = handle->data.send.data.eager_short.cacheentry;
864     rdmaop = handle->data.send.data.eager_short.rdmaop;
865
866     offset = handle->data.send.data.eager_short.offset;
867     offset += (CMI_VMI_Eager_Short_Message_Boundary - msgsize);
868
869     rdmaop->numBufs = 1;
870     rdmaop->buffers[0] = cacheentry->bufferHandle;
871     rdmaop->addr[0] = handle->msg;
872     rdmaop->sz[0] = msgsize + sizeof (CMI_VMI_Eager_Short_Slot_Footer_T);
873     rdmaop->rbuffer = handle->data.send.data.eager_short.remote_buffer;
874     rdmaop->roffset = offset;
875     rdmaop->notify = FALSE;
876
877     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) NULL, (VMIRDMACompleteNotification) NULL);
878     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
879
880     process->eager_short_send_index = ((index + 1) % process->eager_short_send_size);
881     process->eager_short_send_credits_available -= 1;
882   } else if (CMI_VMI_Eager_Protocol && (process->eager_long_send_size > 0) && (msgsize < handle->data.send.data.eager_long.maxsize)) {
883     context = CONTEXTFIELD (msg);
884     if (context) {
885       cacheentry = CMI_VMI_CacheEntry_From_Context (context);
886     } else {
887       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
888       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
889     }
890
891     process->eager_long_send_size = index;
892
893     handle->msg = msg;
894     handle->msgsize = msgsize;
895     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
896     handle->data.send.data.eager_long.cacheentry = cacheentry;
897
898     status = VMI_RDMA_Alloc_Op (&rdmaop);
899     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Alloc_Op()");
900
901     rdmaop->numBufs = 1;
902     rdmaop->buffers[0] = cacheentry->bufferHandle;
903     rdmaop->addr[0] = handle->msg;
904     rdmaop->sz[0] = msgsize;
905     rdmaop->rbuffer = handle->data.send.data.eager_long.remote_buffer;
906     rdmaop->roffset = 0;
907     rdmaop->notify = TRUE;
908
909     CMI_VMI_AsyncMsgCount += 1;
910     handle->refcount += 1;
911
912     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) handle, (VMIRDMACompleteNotification) CMI_VMI_RDMA_Put_Completion_Handler);
913     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
914
915     while (handle->refcount > 2) {
916       sched_yield ();
917       status = VMI_Poll ();
918       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
919     }
920
921     if (!context) {
922       status = VMI_Cache_Deregister (cacheentry);
923       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
924     }
925
926     CMI_VMI_Handle_Deallocate (handle);
927   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
928     addrs[0] = (PVOID) msg;
929     sz[0] = (ULONG) msgsize;
930
931     status = VMI_Stream_Send_Inline (process->connection, addrs, sz, 1, msgsize);
932     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
933   } else {
934     if (CMI_VMI_Eager_Protocol) {
935       context = CONTEXTFIELD (msg);
936       if (context) {
937         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
938       } else {
939         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
940         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
941       }
942     } else {
943       context = NULL;
944       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
945       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
946     }
947
948     handle = CMI_VMI_Handle_Allocate ();
949
950     handle->refcount += 1;
951     handle->msg = msg;
952     handle->msgsize = msgsize;
953     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
954     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_RDMAGET;
955     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
956     handle->data.send.data.rdmaget.cacheentry = cacheentry;
957
958     CMI_VMI_AsyncMsgCount += 1;
959     handle->refcount += 1;
960     handle->data.send.data.rdmaget.publishes_pending = 1;
961
962     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
963
964     status = VMI_RDMA_Publish_Buffer (process->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg, (UINT32) msgsize,
965                                       (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
966                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T));
967     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
968
969     while (handle->refcount > 2) {
970       sched_yield ();
971       status = VMI_Poll ();
972       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
973     }
974
975     if (!context) {
976       status = VMI_Cache_Deregister (cacheentry);
977       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
978     }
979
980     CMI_VMI_Handle_Deallocate (handle);
981   }
982 }
983
984 /* FIXME just here for compilation purpose, it may not work */
985 void CmiPushPE(int pe,void *msg)
986 {
987   CdsFifo_Enqueue (CpvAccessOther (CmiLocalQueue, pe), msg);
988 }
989
990
991 /**************************************************************************
992 **
993 */
994 CmiCommHandle CmiAsyncSendFn (int destrank, int msgsize, char *msg)
995 {
996   VMI_STATUS status;
997
998   char *msgcopy;
999
1000   CMI_VMI_Process_T *process;
1001
1002   PVMI_BUFFER bufHandles[2];
1003   PVOID addrs[2];
1004   ULONG sz[2];
1005
1006   CMI_VMI_Handle_T *handle;
1007
1008   PVMI_CACHE_ENTRY cacheentry;
1009
1010   CMI_VMI_Publish_Message_T publish_msg;
1011
1012   void *context;
1013
1014   int index;
1015
1016   PVMI_RDMA_OP rdmaop;
1017
1018   int offset;
1019
1020   CMI_VMI_Eager_Short_Slot_Footer_T footer;
1021
1022
1023   DEBUG_PRINT ("CmiAsyncSendFn() called.\n");
1024
1025   if (destrank == _Cmi_mype) {
1026     msgcopy = CmiAlloc (msgsize);
1027     memcpy (msgcopy, msg, msgsize);
1028     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
1029     return ((CmiCommHandle) NULL);
1030   }
1031
1032   process = &CMI_VMI_Processes[destrank];
1033
1034 #if CMK_BROADCAST_SPANNING_TREE
1035   CMI_SET_BROADCAST_ROOT (msg, 0);
1036 #endif
1037
1038   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
1039   CMI_VMI_MESSAGE_CREDITS (msg) = process->eager_short_receive_credits_replentish;
1040   process->eager_short_receive_credits_replentish = 0;
1041
1042   // These are assigned here for the Eager long test in the second "if" case.
1043   index = process->eager_long_send_size - 1;
1044   handle = process->eager_long_send_handles[index];
1045
1046   if (CMI_VMI_Eager_Protocol && (process->eager_short_send_credits_available > 0) && (msgsize < CMI_VMI_Eager_Short_Message_Boundary)) {
1047     index = process->eager_short_send_index;
1048     handle = process->eager_short_send_handles[index];
1049
1050     memcpy (handle->msg, msg, msgsize);
1051
1052     footer.msgsize = msgsize;
1053     footer.sentinel = CMI_VMI_EAGER_SHORT_SENTINEL_DATA;
1054     memcpy (handle->msg + msgsize, &footer, sizeof (CMI_VMI_Eager_Short_Slot_Footer_T));
1055
1056     handle->msgsize = msgsize;
1057     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1058
1059     cacheentry = handle->data.send.data.eager_short.cacheentry;
1060     rdmaop = handle->data.send.data.eager_short.rdmaop;
1061
1062     offset = handle->data.send.data.eager_short.offset;
1063     offset += (CMI_VMI_Eager_Short_Message_Boundary - msgsize);
1064
1065     rdmaop->numBufs = 1;
1066     rdmaop->buffers[0] = cacheentry->bufferHandle;
1067     rdmaop->addr[0] = handle->msg;
1068     rdmaop->sz[0] = msgsize + sizeof (CMI_VMI_Eager_Short_Slot_Footer_T);
1069     rdmaop->rbuffer = handle->data.send.data.eager_short.remote_buffer;
1070     rdmaop->roffset = offset;
1071     rdmaop->notify = FALSE;
1072
1073     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) NULL, (VMIRDMACompleteNotification) NULL);
1074     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
1075
1076     process->eager_short_send_index = ((index + 1) % process->eager_short_send_size);
1077     process->eager_short_send_credits_available -= 1;
1078
1079     handle = NULL;
1080   } else if (CMI_VMI_Eager_Protocol && (process->eager_long_send_size > 0) && (msgsize < handle->data.send.data.eager_long.maxsize)) {
1081     context = CONTEXTFIELD (msg);
1082     if (context) {
1083       cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1084     } else {
1085       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1086       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1087     }
1088
1089     process->eager_long_send_size = index;
1090
1091     handle->msg = msg;
1092     handle->msgsize = msgsize;
1093     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1094     handle->data.send.data.eager_long.cacheentry = cacheentry;
1095
1096     status = VMI_RDMA_Alloc_Op (&rdmaop);
1097     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Alloc_Op()");
1098
1099     rdmaop->numBufs = 1;
1100     rdmaop->buffers[0] = cacheentry->bufferHandle;
1101     rdmaop->addr[0] = handle->msg;
1102     rdmaop->sz[0] = msgsize;
1103     rdmaop->rbuffer = handle->data.send.data.eager_long.remote_buffer;
1104     rdmaop->roffset = 0;
1105     rdmaop->notify = TRUE;
1106
1107     CMI_VMI_AsyncMsgCount += 1;
1108     handle->refcount += 1;
1109
1110     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) handle, (VMIRDMACompleteNotification) CMI_VMI_RDMA_Put_Completion_Handler);
1111     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
1112   } else if (msgsize < CMI_VMI_Small_Message_Boundary) {
1113     addrs[0] = (PVOID) msg;
1114     sz[0] = msgsize;
1115
1116     status = VMI_Stream_Send_Inline (process->connection, addrs, sz, 1, msgsize);
1117     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1118
1119     handle = NULL;
1120   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
1121     if (CMI_VMI_Eager_Protocol) {
1122       context = CONTEXTFIELD (msg);
1123       if (context) {
1124         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1125       } else {
1126         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1127         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1128       }
1129     } else {
1130       context = NULL;
1131       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1132       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1133     }
1134
1135     handle = CMI_VMI_Handle_Allocate ();
1136
1137     handle->refcount += 1;
1138     handle->msg = msg;
1139     handle->msgsize = msgsize;
1140     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1141     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_STREAM;
1142     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1143     handle->data.send.data.stream.cacheentry = cacheentry;
1144
1145     bufHandles[0] = cacheentry->bufferHandle;
1146     addrs[0] = (PVOID) msg;
1147     sz[0] = msgsize;
1148
1149     CMI_VMI_AsyncMsgCount += 1;
1150     handle->refcount += 1;
1151
1152     status = VMI_Stream_Send (process->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1153     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1154   } else {
1155     if (CMI_VMI_Eager_Protocol) {
1156       context = CONTEXTFIELD (msg);
1157       if (context) {
1158         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1159       } else {
1160         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1161         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1162       }
1163     } else {
1164       context = NULL;
1165       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1166       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1167     }
1168
1169     handle = CMI_VMI_Handle_Allocate ();
1170
1171     handle->refcount += 1;
1172     handle->msg = msg;
1173     handle->msgsize = msgsize;
1174     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1175     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_RDMAGET;
1176     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1177     handle->data.send.data.rdmaget.cacheentry = cacheentry;
1178
1179     handle->refcount += 1;
1180     CMI_VMI_AsyncMsgCount += 1;
1181     handle->data.send.data.rdmaget.publishes_pending = 1;
1182
1183     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
1184
1185 #if CMI_VMI_USE_VMI22
1186     status = VMI_RDMA_Publish_Buffer_With_Callback (process->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg, (UINT32) msgsize,
1187                                                     (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1188                                                     (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
1189     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
1190
1191     while (handle->data.send.data.rdmaget.publishes_pending > 0) {
1192       sched_yield ();
1193       status = VMI_Poll ();
1194       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
1195     }
1196 #else
1197     status = VMI_RDMA_Publish_Buffer (process->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg, (UINT32) msgsize,
1198                                       (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1199                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1200     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1201 #endif
1202   }
1203
1204   return ((CmiCommHandle) handle);
1205 }
1206
1207
1208
1209 /**************************************************************************
1210 **
1211 */
1212 void CmiFreeSendFn (int destrank, int msgsize, char *msg)
1213 {
1214   VMI_STATUS status;
1215
1216   char *msgcopy;
1217
1218   CMI_VMI_Process_T *process;
1219
1220   PVMI_BUFFER bufHandles[2];
1221   PVOID addrs[2];
1222   ULONG sz[2];
1223
1224   CMI_VMI_Handle_T *handle;
1225
1226   PVMI_CACHE_ENTRY cacheentry;
1227
1228   CMI_VMI_Publish_Message_T publish_msg;
1229
1230   void *context;
1231
1232   int index;
1233
1234   PVMI_RDMA_OP rdmaop;
1235
1236   int offset;
1237
1238   CMI_VMI_Eager_Short_Slot_Footer_T footer;
1239
1240
1241   DEBUG_PRINT ("CmiFreeSendFn() called.\n");
1242
1243   if (destrank == _Cmi_mype) {
1244     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msg);
1245     return;
1246   }
1247
1248   process = &CMI_VMI_Processes[destrank];
1249
1250 #if CMK_BROADCAST_SPANNING_TREE
1251   CMI_SET_BROADCAST_ROOT (msg, 0);
1252 #endif
1253
1254   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
1255   CMI_VMI_MESSAGE_CREDITS (msg) = process->eager_short_receive_credits_replentish;
1256   process->eager_short_receive_credits_replentish = 0;
1257
1258   // These are assigned here for the Eager long test in the second "if" case.
1259   index = process->eager_long_send_size - 1;
1260   handle = process->eager_long_send_handles[index];
1261
1262   if (CMI_VMI_Eager_Protocol && (process->eager_short_send_credits_available > 0) && (msgsize < CMI_VMI_Eager_Short_Message_Boundary)) {
1263     index = process->eager_short_send_index;
1264     handle = process->eager_short_send_handles[index];
1265
1266     memcpy (handle->msg, msg, msgsize);
1267
1268     footer.msgsize = msgsize;
1269     footer.sentinel = CMI_VMI_EAGER_SHORT_SENTINEL_DATA;
1270     memcpy (handle->msg + msgsize, &footer, sizeof (CMI_VMI_Eager_Short_Slot_Footer_T));
1271
1272     handle->msgsize = msgsize;
1273     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1274
1275     cacheentry = handle->data.send.data.eager_short.cacheentry;
1276     rdmaop = handle->data.send.data.eager_short.rdmaop;
1277
1278     offset = handle->data.send.data.eager_short.offset;
1279     offset += (CMI_VMI_Eager_Short_Message_Boundary - msgsize);
1280
1281     rdmaop->numBufs = 1;
1282     rdmaop->buffers[0] = cacheentry->bufferHandle;
1283     rdmaop->addr[0] = handle->msg;
1284     rdmaop->sz[0] = msgsize + sizeof (CMI_VMI_Eager_Short_Slot_Footer_T);
1285     rdmaop->rbuffer = handle->data.send.data.eager_short.remote_buffer;
1286     rdmaop->roffset = offset;
1287     rdmaop->notify = FALSE;
1288
1289     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) NULL, (VMIRDMACompleteNotification) NULL);
1290     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
1291
1292     process->eager_short_send_index = ((index + 1) % process->eager_short_send_size);
1293     process->eager_short_send_credits_available -= 1;
1294
1295     CmiFree (msg);
1296   } else if (CMI_VMI_Eager_Protocol && (process->eager_long_send_size > 0) && (msgsize < handle->data.send.data.eager_long.maxsize)) {
1297     context = CONTEXTFIELD (msg);
1298     if (context) {
1299       cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1300     } else {
1301       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1302       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1303     }
1304
1305     process->eager_long_send_size = index;
1306
1307     handle->msg = msg;
1308     handle->msgsize = msgsize;
1309     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_FREE;
1310     handle->data.send.data.eager_long.cacheentry = cacheentry;
1311
1312     status = VMI_RDMA_Alloc_Op (&rdmaop);
1313     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Alloc_Op()");
1314
1315     rdmaop->numBufs = 1;
1316     rdmaop->buffers[0] = cacheentry->bufferHandle;
1317     rdmaop->addr[0] = handle->msg;
1318     rdmaop->sz[0] = msgsize;
1319     rdmaop->rbuffer = handle->data.send.data.eager_long.remote_buffer;
1320     rdmaop->roffset = 0;
1321     rdmaop->notify = TRUE;
1322
1323     CMI_VMI_AsyncMsgCount += 1;
1324     /* Do NOT increment handle->refcount here! */
1325
1326     status = VMI_RDMA_Put (process->connection, rdmaop, (PVOID) handle, (VMIRDMACompleteNotification) CMI_VMI_RDMA_Put_Completion_Handler);
1327     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Put()");
1328   } else if (msgsize < CMI_VMI_Small_Message_Boundary) {
1329     addrs[0] = (PVOID) msg;
1330     sz[0] = msgsize;
1331
1332     status = VMI_Stream_Send_Inline (process->connection, addrs, sz, 1, msgsize);
1333     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1334
1335     CmiFree (msg);
1336   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
1337     if (CMI_VMI_Eager_Protocol) {
1338       context = CONTEXTFIELD (msg);
1339       if (context) {
1340         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1341       } else {
1342         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1343         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1344       }
1345     } else {
1346       context = NULL;
1347       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1348       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1349     }
1350
1351     handle = CMI_VMI_Handle_Allocate ();
1352
1353     /* Do NOT increment handle->refcount here! */
1354     handle->msg = msg;
1355     handle->msgsize = msgsize;
1356     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1357     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_STREAM;
1358     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_FREE;
1359     handle->data.send.data.stream.cacheentry = cacheentry;
1360
1361     bufHandles[0] = cacheentry->bufferHandle;
1362     addrs[0] = (PVOID) msg;
1363     sz[0] = msgsize;
1364
1365     handle->refcount += 1;
1366     CMI_VMI_AsyncMsgCount += 1;
1367
1368     status = VMI_Stream_Send (process->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1369     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1370   } else {
1371     if (CMI_VMI_Eager_Protocol) {
1372       context = CONTEXTFIELD (msg);
1373       if (context) {
1374         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1375       } else {
1376         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1377         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1378       }
1379     } else {
1380       context = NULL;
1381       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1382       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1383     }
1384
1385     handle = CMI_VMI_Handle_Allocate ();
1386
1387     /* Do NOT increment handle->refcount here! */
1388     handle->msg = msg;
1389     handle->msgsize = msgsize;
1390     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1391     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_RDMAGET;
1392     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_FREE;
1393     handle->data.send.data.rdmaget.cacheentry = cacheentry;
1394
1395     handle->refcount += 1;
1396     CMI_VMI_AsyncMsgCount += 1;
1397     handle->data.send.data.rdmaget.publishes_pending = 1;
1398
1399     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
1400
1401 #if CMI_VMI_USE_VMI22
1402     status = VMI_RDMA_Publish_Buffer_With_Callback (process->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg, (UINT32) msgsize,
1403                                                     (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1404                                                     (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
1405     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
1406
1407     while (handle->data.send.data.rdmaget.publishes_pending > 0) {
1408       sched_yield ();
1409       status = VMI_Poll ();
1410       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
1411     }
1412 #else
1413     status = VMI_RDMA_Publish_Buffer (process->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg, (UINT32) msgsize,
1414                                       (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1415                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1416     CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1417 #endif
1418   }
1419 }
1420
1421
1422
1423 /**************************************************************************
1424 **
1425 */
1426 void CmiSyncBroadcastFn (int msgsize, char *msg)
1427 {
1428   VMI_STATUS status;
1429
1430   PVMI_BUFFER bufHandles[2];
1431   PVOID addrs[2];
1432   ULONG sz[2];
1433
1434   CMI_VMI_Handle_T *handle;
1435
1436   int i;
1437
1438   PVMI_CACHE_ENTRY cacheentry;
1439
1440   int childcount;
1441   int startrank;
1442   int destrank;
1443
1444   CMI_VMI_Publish_Message_T publish_msg;
1445
1446   void *context;
1447
1448
1449   DEBUG_PRINT ("CmiSyncBroadcastFn() called.\n");
1450
1451   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
1452   CMI_VMI_MESSAGE_CREDITS (msg) = 0;
1453
1454   if (msgsize < CMI_VMI_Medium_Message_Boundary) {
1455     if (CMI_VMI_Eager_Protocol) {
1456       context = CONTEXTFIELD (msg);
1457       if (context) {
1458         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1459       } else {
1460         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1461         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1462       }
1463     } else {
1464       context = NULL;
1465       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1466       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1467     }
1468
1469     handle = CMI_VMI_Handle_Allocate ();
1470
1471     handle->refcount += 1;
1472     handle->msg = msg;
1473     handle->msgsize = msgsize;
1474     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1475     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_STREAM;
1476     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1477     handle->data.send.data.stream.cacheentry = cacheentry;
1478
1479     bufHandles[0] = cacheentry->bufferHandle;
1480     addrs[0] = (PVOID) msg;
1481     sz[0] = (ULONG) msgsize;
1482
1483 #if CMK_BROADCAST_SPANNING_TREE
1484     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1485
1486     childcount = CMI_VMI_Spanning_Children_Count (msg);
1487
1488     handle->refcount += childcount;
1489     CMI_VMI_AsyncMsgCount += childcount;
1490
1491     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1492     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1493       destrank = _Cmi_mype - startrank;
1494
1495       if (destrank < 0) {
1496         destrank += _Cmi_numpes;
1497       }
1498
1499       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1500
1501       if (destrank > (_Cmi_numpes - 1)) {
1502         break;
1503       }
1504
1505       destrank += startrank;
1506       destrank %= _Cmi_numpes;
1507
1508       status = VMI_Stream_Send ((&CMI_VMI_Processes[destrank])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1509       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1510     }
1511 #else
1512     handle->refcount += (_Cmi_numpes - 1);
1513     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
1514
1515     for (i = 0; i < _Cmi_mype; i++) {
1516       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1517       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1518     }
1519
1520     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1521       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1522       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1523     }
1524 #endif
1525
1526     while (handle->refcount > 2) {
1527       sched_yield ();
1528       status = VMI_Poll ();
1529       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
1530     }
1531
1532     if (!context) {
1533       status = VMI_Cache_Deregister (cacheentry);
1534       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
1535     }
1536
1537     CMI_VMI_Handle_Deallocate (handle);
1538   } else {
1539     if (CMI_VMI_Eager_Protocol) {
1540       context = CONTEXTFIELD (msg);
1541       if (context) {
1542         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1543       } else {
1544         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1545         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1546       }
1547     } else {
1548       context = NULL;
1549       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1550       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1551     }
1552
1553     handle = CMI_VMI_Handle_Allocate ();
1554
1555     handle->refcount += 1;
1556     handle->msg = msg;
1557     handle->msgsize = msgsize;
1558     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1559     handle->data.send.send_handle_type=CMI_VMI_SEND_HANDLE_TYPE_RDMABROADCAST;
1560     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1561     handle->data.send.data.rdmabroadcast.cacheentry = cacheentry;
1562
1563     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
1564
1565 #if CMK_BROADCAST_SPANNING_TREE
1566     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1567
1568     childcount = CMI_VMI_Spanning_Children_Count (msg);
1569
1570     handle->refcount += childcount;
1571     CMI_VMI_AsyncMsgCount += childcount;
1572     handle->data.send.data.rdmabroadcast.publishes_pending = childcount;
1573
1574     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1575     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1576       destrank = _Cmi_mype - startrank;
1577
1578       if (destrank < 0) {
1579         destrank += _Cmi_numpes;
1580       }
1581
1582       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1583
1584       if (destrank > (_Cmi_numpes - 1)) {
1585         break;
1586       }
1587
1588       destrank += startrank;
1589       destrank %= _Cmi_numpes;
1590
1591       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[destrank])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1592                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1593                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1594       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1595     }
1596 #else
1597     handle->refcount += (_Cmi_numpes - 1);
1598     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
1599     handle->data.send.data.rdmabroadcast.publishes_pending = (_Cmi_numpes - 1);
1600
1601     for (i = 0; i < _Cmi_mype; i++) {
1602       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1603                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1604                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1605       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1606     }
1607
1608     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1609       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1610                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1611                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1612       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1613     }
1614 #endif
1615
1616     while (handle->refcount > 2) {
1617       sched_yield ();
1618       status = VMI_Poll ();
1619       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
1620     }
1621
1622     if (!context) {
1623       status = VMI_Cache_Deregister (cacheentry);
1624       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
1625     }
1626
1627     CMI_VMI_Handle_Deallocate (handle);
1628   }
1629 }
1630
1631
1632
1633 /**************************************************************************
1634 **
1635 */
1636 CmiCommHandle CmiAsyncBroadcastFn (int msgsize, char *msg)
1637 {
1638   VMI_STATUS status;
1639
1640   PVMI_BUFFER bufHandles[2];
1641   PVOID addrs[2];
1642   ULONG sz[2];
1643
1644   CMI_VMI_Handle_T *handle;
1645
1646   int i;
1647
1648   PVMI_CACHE_ENTRY cacheentry;
1649
1650   int childcount;
1651   int startrank;
1652   int destrank;
1653
1654   CMI_VMI_Publish_Message_T publish_msg;
1655
1656   void *context;
1657
1658
1659   DEBUG_PRINT ("CmiAsyncBroadcastFn() called.\n");
1660
1661   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
1662   CMI_VMI_MESSAGE_CREDITS (msg) = 0;
1663
1664   if (msgsize < CMI_VMI_Small_Message_Boundary) {
1665     addrs[0] = (PVOID) msg;
1666     sz[0] = msgsize;
1667
1668 #if CMK_BROADCAST_SPANNING_TREE
1669     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1670
1671     childcount = CMI_VMI_Spanning_Children_Count (msg);
1672
1673     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1674     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1675       destrank = _Cmi_mype - startrank;
1676
1677       if (destrank < 0) {
1678         destrank += _Cmi_numpes;
1679       }
1680
1681       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1682
1683       if (destrank > (_Cmi_numpes - 1)) {
1684         break;
1685       }
1686
1687       destrank += startrank;
1688       destrank %= _Cmi_numpes;
1689
1690       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[destrank])->connection, addrs, sz, 1, msgsize);
1691       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1692     }
1693 #else
1694     for (i = 0; i < _Cmi_mype; i++) {
1695       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, msgsize);
1696       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1697     }
1698
1699     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1700       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, msgsize);
1701       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1702     }
1703 #endif
1704
1705     handle = NULL;
1706   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
1707     if (CMI_VMI_Eager_Protocol) {
1708       context = CONTEXTFIELD (msg);
1709       if (context) {
1710         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1711       } else {
1712         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1713         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1714       }
1715     } else {
1716       context = NULL;
1717       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1718       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1719     }
1720
1721     handle = CMI_VMI_Handle_Allocate ();
1722
1723     handle->refcount += 1;
1724     handle->msg = msg;
1725     handle->msgsize = msgsize;
1726     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1727     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_STREAM;
1728     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1729     handle->data.send.data.stream.cacheentry = cacheentry;
1730
1731     bufHandles[0] = cacheentry->bufferHandle;
1732     addrs[0] = (PVOID) msg;
1733     sz[0] = (ULONG) msgsize;
1734
1735 #if CMK_BROADCAST_SPANNING_TREE
1736     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1737
1738     childcount = CMI_VMI_Spanning_Children_Count (msg);
1739
1740     handle->refcount += childcount;
1741     CMI_VMI_AsyncMsgCount += childcount;
1742
1743     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1744     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1745       destrank = _Cmi_mype - startrank;
1746
1747       if (destrank < 0) {
1748         destrank += _Cmi_numpes;
1749       }
1750
1751       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1752
1753       if (destrank > (_Cmi_numpes - 1)) {
1754         break;
1755       }
1756
1757       destrank += startrank;
1758       destrank %= _Cmi_numpes;
1759
1760       status = VMI_Stream_Send ((&CMI_VMI_Processes[destrank])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1761       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1762     }
1763 #else
1764     handle->refcount += (_Cmi_numpes - 1);
1765     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
1766
1767     for (i = 0; i < _Cmi_mype; i++) {
1768       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1769       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1770     }
1771
1772     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1773       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
1774       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
1775     }
1776 #endif
1777   } else {
1778     if (CMI_VMI_Eager_Protocol) {
1779       context = CONTEXTFIELD (msg);
1780       if (context) {
1781         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1782       } else {
1783         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1784         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1785       }
1786     } else {
1787       context = NULL;
1788       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1789       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1790     }
1791
1792     handle = CMI_VMI_Handle_Allocate ();
1793
1794     handle->refcount += 1;
1795     handle->msg = msg;
1796     handle->msgsize = msgsize;
1797     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1798     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_RDMABROADCAST;
1799     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_NONE;
1800     handle->data.send.data.rdmabroadcast.cacheentry = cacheentry;
1801
1802     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
1803
1804 #if CMK_BROADCAST_SPANNING_TREE
1805     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1806
1807     childcount = CMI_VMI_Spanning_Children_Count (msg);
1808
1809     handle->refcount += childcount;
1810     CMI_VMI_AsyncMsgCount += childcount;
1811     handle->data.send.data.rdmabroadcast.publishes_pending = childcount;
1812
1813     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1814     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1815       destrank = _Cmi_mype - startrank;
1816
1817       if (destrank < 0) {
1818         destrank += _Cmi_numpes;
1819       }
1820
1821       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1822
1823       if (destrank > (_Cmi_numpes - 1)) {
1824         break;
1825       }
1826
1827       destrank += startrank;
1828       destrank %= _Cmi_numpes;
1829
1830 #if CMI_VMI_USE_VMI22
1831       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[destrank])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1832                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1833                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
1834       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
1835 #else
1836       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[destrank])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1837                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1838                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1839       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1840 #endif
1841     }
1842 #else
1843     handle->refcount += (_Cmi_numpes - 1);
1844     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
1845     handle->data.send.data.rdmabroadcast.publishes_pending = (_Cmi_numpes - 1);
1846
1847     for (i = 0; i < _Cmi_mype; i++) {
1848 #if CMI_VMI_USE_VMI22
1849       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1850                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1851                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
1852       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
1853 #else
1854       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1855                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1856                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1857       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1858 #endif
1859     }
1860
1861     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1862 #if CMI_VMI_USE_VMI22
1863       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1864                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1865                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
1866       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
1867 #else
1868       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
1869                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
1870                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
1871       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
1872 #endif
1873     }
1874 #endif
1875
1876 #if CMI_VMI_USE_VMI22
1877     while (handle->data.send.data.rdmabroadcast.publishes_pending > 0) {
1878       sched_yield ();
1879       status = VMI_Poll ();
1880       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
1881     }
1882 #endif
1883   }
1884
1885   return ((CmiCommHandle) handle);
1886 }
1887
1888
1889
1890 /**************************************************************************
1891 **
1892 */
1893 void CmiFreeBroadcastFn (int msgsize, char *msg)
1894 {
1895   VMI_STATUS status;
1896
1897   PVMI_BUFFER bufHandles[2];
1898   PVOID addrs[2];
1899   ULONG sz[2];
1900
1901   CMI_VMI_Handle_T *handle;
1902
1903   int i;
1904
1905   PVMI_CACHE_ENTRY cacheentry;
1906
1907   int childcount;
1908   int startrank;
1909   int destrank;
1910
1911   CMI_VMI_Publish_Message_T publish_msg;
1912
1913   void *context;
1914
1915
1916   DEBUG_PRINT ("CmiFreeBroadcastFn() called.\n");
1917
1918   CMI_VMI_MESSAGE_TYPE (msg) = CMI_VMI_MESSAGE_TYPE_STANDARD;
1919   CMI_VMI_MESSAGE_CREDITS (msg) = 0;
1920
1921   if (msgsize < CMI_VMI_Small_Message_Boundary) {
1922     addrs[0] = (PVOID) msg;
1923     sz[0] = msgsize;
1924
1925 #if CMK_BROADCAST_SPANNING_TREE
1926     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1927
1928     childcount = CMI_VMI_Spanning_Children_Count (msg);
1929
1930     startrank = CMI_BROADCAST_ROOT (msg) - 1;
1931     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
1932       destrank = _Cmi_mype - startrank;
1933
1934       if (destrank < 0) {
1935         destrank += _Cmi_numpes;
1936       }
1937
1938       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
1939
1940       if (destrank > (_Cmi_numpes - 1)) {
1941         break;
1942       }
1943
1944       destrank += startrank;
1945       destrank %= _Cmi_numpes;
1946
1947       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[destrank])->connection, addrs, sz, 1, msgsize);
1948       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1949     }
1950 #else
1951     for (i = 0; i < _Cmi_mype; i++) {
1952       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, msgsize);
1953       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1954     }
1955
1956     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
1957       status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, msgsize);
1958       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
1959     }
1960 #endif
1961
1962     CmiFree (msg);
1963   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
1964     if (CMI_VMI_Eager_Protocol) {
1965       context = CONTEXTFIELD (msg);
1966       if (context) {
1967         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
1968       } else {
1969         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1970         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1971       }
1972     } else {
1973       context = NULL;
1974       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
1975       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
1976     }
1977
1978     handle = CMI_VMI_Handle_Allocate ();
1979
1980     /* Do NOT increment handle->refcount here! */
1981     handle->msg = msg;
1982     handle->msgsize = msgsize;
1983     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
1984     handle->data.send.send_handle_type = CMI_VMI_SEND_HANDLE_TYPE_STREAM;
1985     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_FREE;
1986     handle->data.send.data.stream.cacheentry = cacheentry;
1987
1988     bufHandles[0] = cacheentry->bufferHandle;
1989     addrs[0] = (PVOID) msg;
1990     sz[0] = (ULONG) msgsize;
1991
1992 #if CMK_BROADCAST_SPANNING_TREE
1993     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
1994
1995     childcount = CMI_VMI_Spanning_Children_Count (msg);
1996
1997     handle->refcount += childcount;
1998     CMI_VMI_AsyncMsgCount += childcount;
1999
2000     startrank = CMI_BROADCAST_ROOT (msg) - 1;
2001     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
2002       destrank = _Cmi_mype - startrank;
2003
2004       if (destrank < 0) {
2005         destrank += _Cmi_numpes;
2006       }
2007
2008       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
2009
2010       if (destrank > (_Cmi_numpes - 1)) {
2011         break;
2012       }
2013
2014       destrank += startrank;
2015       destrank %= _Cmi_numpes;
2016
2017       status = VMI_Stream_Send ((&CMI_VMI_Processes[destrank])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
2018       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
2019     }
2020 #else
2021     handle->refcount += (_Cmi_numpes - 1);
2022     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
2023
2024     for (i = 0; i < _Cmi_mype; i++) {
2025       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
2026       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
2027     }
2028
2029     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
2030       status = VMI_Stream_Send ((&CMI_VMI_Processes[i])->connection, bufHandles, addrs, sz, 1, CMI_VMI_Stream_Completion_Handler, (PVOID) handle, TRUE);
2031       CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send()");
2032     }
2033 #endif
2034   } else {
2035     if (CMI_VMI_Eager_Protocol) {
2036       context = CONTEXTFIELD (msg);
2037       if (context) {
2038         cacheentry = CMI_VMI_CacheEntry_From_Context (context);
2039       } else {
2040         status = VMI_Cache_Register (msg, msgsize, &cacheentry);
2041         CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
2042       }
2043     } else {
2044       context = NULL;
2045       status = VMI_Cache_Register (msg, msgsize, &cacheentry);
2046       CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Register()");
2047     }
2048
2049     handle = CMI_VMI_Handle_Allocate ();
2050
2051     /* Do NOT increment handle->refcount here! */
2052     handle->msg = msg;
2053     handle->msgsize = msgsize;
2054     handle->handle_type = CMI_VMI_HANDLE_TYPE_SEND;
2055     handle->data.send.send_handle_type=CMI_VMI_SEND_HANDLE_TYPE_RDMABROADCAST;
2056     handle->data.send.message_disposition = CMI_VMI_MESSAGE_DISPOSITION_FREE;
2057     handle->data.send.data.rdmabroadcast.cacheentry = cacheentry;
2058
2059     publish_msg.type = CMI_VMI_PUBLISH_TYPE_GET;
2060
2061 #if CMK_BROADCAST_SPANNING_TREE
2062     CMI_SET_BROADCAST_ROOT (msg, (_Cmi_mype + 1));
2063
2064     childcount = CMI_VMI_Spanning_Children_Count (msg);
2065
2066     handle->refcount += childcount;
2067     CMI_VMI_AsyncMsgCount += childcount;
2068     handle->data.send.data.rdmabroadcast.publishes_pending = childcount;
2069
2070     startrank = CMI_BROADCAST_ROOT (msg) - 1;
2071     for (i = 1; i <= CMI_VMI_BROADCAST_SPANNING_FACTOR; i++) {
2072       destrank = _Cmi_mype - startrank;
2073
2074       if (destrank < 0) {
2075         destrank += _Cmi_numpes;
2076       }
2077
2078       destrank = CMI_VMI_BROADCAST_SPANNING_FACTOR * destrank + i;
2079
2080       if (destrank > (_Cmi_numpes - 1)) {
2081         break;
2082       }
2083
2084       destrank += startrank;
2085       destrank %= _Cmi_numpes;
2086
2087 #if CMI_VMI_USE_VMI22
2088       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[destrank])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2089                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2090                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
2091       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
2092 #else
2093       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[destrank])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2094                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2095                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
2096       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
2097 #endif
2098     }
2099 #else
2100     handle->refcount += (_Cmi_numpes - 1);
2101     CMI_VMI_AsyncMsgCount += (_Cmi_numpes - 1);
2102     handle->data.send.data.rdmabroadcast.publishes_pending = (_Cmi_numpes - 1);
2103
2104     for (i = 0; i < _Cmi_mype; i++) {
2105 #if CMI_VMI_USE_VMI22
2106       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2107                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2108                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
2109       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
2110 #else
2111       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2112                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2113                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
2114       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
2115 #endif
2116     }
2117
2118     for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
2119 #if CMI_VMI_USE_VMI22
2120       status = VMI_RDMA_Publish_Buffer_With_Callback ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2121                                                       (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2122                                                       (ULONG) sizeof (CMI_VMI_Publish_Message_T), (PVOID) handle, CMI_VMI_RDMA_Publish_Completion_Handler);
2123       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer_With_Callback()");
2124 #else
2125       status = VMI_RDMA_Publish_Buffer ((&CMI_VMI_Processes[i])->connection, cacheentry->bufferHandle, (VMI_virt_addr_t) (VMI_ADDR_CAST) msg,
2126                                         (UINT32) msgsize, (VMI_virt_addr_t) (VMI_ADDR_CAST) NULL, (UINT32) handle->index, (PVOID) &publish_msg,
2127                                         (ULONG) sizeof (CMI_VMI_Publish_Message_T));
2128       CMI_VMI_CHECK_SUCCESS (status, "VMI_RDMA_Publish_Buffer()");
2129 #endif
2130     }
2131 #endif
2132
2133 #if CMI_VMI_USE_VMI22
2134     while (handle->data.send.data.rdmabroadcast.publishes_pending > 0) {
2135       sched_yield ();
2136       status = VMI_Poll ();
2137       CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
2138     }
2139 #endif
2140   }
2141 }
2142
2143
2144
2145 /**************************************************************************
2146 **
2147 */
2148 void CmiSyncBroadcastAllFn (int msgsize, char *msg)
2149 {
2150   char *msgcopy;
2151
2152
2153   DEBUG_PRINT ("CmiSyncBroadcastAllFn() called.\n");
2154
2155   msgcopy = CmiAlloc (msgsize);
2156   memcpy (msgcopy, msg, msgsize);
2157   CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
2158
2159   CmiSyncBroadcastFn (msgsize, msg);
2160 }
2161
2162
2163
2164 /**************************************************************************
2165 **
2166 */
2167 CmiCommHandle CmiAsyncBroadcastAllFn (int msgsize, char *msg)
2168 {
2169   char *msgcopy;
2170
2171
2172   DEBUG_PRINT ("CmiAsyncBroadcastAllFn() called.\n");
2173
2174   msgcopy = CmiAlloc (msgsize);
2175   memcpy (msgcopy, msg, msgsize);
2176   CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
2177
2178   return (CmiAsyncBroadcastFn (msgsize, msg));
2179 }
2180
2181
2182
2183 /**************************************************************************
2184 ** The idea here is that for short messages, just do a sync broadcast
2185 ** since internally all of the sends for short messages happen
2186 ** synchronously anyway and these are pretty quick due to high memory
2187 ** bus bandwidth; for medium messages, copy the message and enqueue the
2188 ** copy locally and then call FreeBroadcast to send the message to the
2189 ** other processes asynchronously (and free the message at an idle point
2190 ** in the future when doing periodic resource cleanup); for large messages,
2191 ** send synchronously to all other processes and then enqueue the actual
2192 ** message to avoid copying it.
2193 */
2194 void CmiFreeBroadcastAllFn (int msgsize, char *msg)
2195 {
2196   char *msgcopy;
2197
2198
2199   DEBUG_PRINT ("CmiFreeBroadcastAllFn() called.\n");
2200
2201 #if CMK_BROADCAST_SPANNING_TREE
2202   if (msgsize < CMI_VMI_Small_Message_Boundary) {
2203     CmiSyncBroadcastFn (msgsize, msg);
2204     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msg);
2205   } else if (msgsize < CMI_VMI_Medium_Message_Boundary) {
2206     msgcopy = CmiAlloc (msgsize);
2207     memcpy (msgcopy, msg, msgsize);
2208     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
2209
2210     CmiFreeBroadcastFn (msgsize, msg);
2211   } else {
2212     CmiSyncBroadcastFn (msgsize, msg);
2213     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msg);
2214   }
2215 #else
2216   if (msgsize < CMI_VMI_Medium_Message_Boundary) {
2217     msgcopy = CmiAlloc (msgsize);
2218     memcpy (msgcopy, msg, msgsize);
2219     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msgcopy);
2220
2221     CmiFreeBroadcastFn (msgsize, msg);
2222   } else {
2223     CmiSyncBroadcastFn (msgsize, msg);
2224     CdsFifo_Enqueue (CpvAccess (CmiLocalQueue), msg);
2225   }
2226 #endif
2227 }
2228
2229
2230
2231 /**************************************************************************
2232 **
2233 */
2234 int CmiAsyncMsgSent (CmiCommHandle commhandle)
2235 {
2236   CMI_VMI_Handle_T *handle;
2237
2238
2239   DEBUG_PRINT ("CmiAsyncMsgSent() called.\n");
2240
2241   if (commhandle) {
2242     handle = (CMI_VMI_Handle_T *) commhandle;
2243     return (handle->refcount <= 2);
2244   }
2245
2246   return (TRUE);
2247 }
2248
2249
2250
2251 /**************************************************************************
2252 **
2253 */
2254 int CmiAllAsyncMsgsSent ()
2255 {
2256   DEBUG_PRINT ("CmiAllAsyncMsgsSent() called.\n");
2257
2258   return (CMI_VMI_AsyncMsgCount < 1);
2259 }
2260
2261
2262
2263 /**************************************************************************
2264 **
2265 */
2266 void CmiReleaseCommHandle (CmiCommHandle commhandle)
2267 {
2268   VMI_STATUS status;
2269
2270   CMI_VMI_Handle_T *handle;
2271
2272   void *context;
2273
2274   int i;
2275
2276
2277   DEBUG_PRINT ("CmiReleaseCommHandle() called.\n");
2278
2279   if (commhandle) {
2280     handle = (CMI_VMI_Handle_T *) commhandle;
2281     handle->refcount -= 1;
2282
2283     if (handle->refcount <= 1) {
2284       if (handle->data.send.send_handle_type == CMI_VMI_SEND_HANDLE_TYPE_STREAM) {
2285         if (CMI_VMI_Eager_Protocol) {
2286           context = CONTEXTFIELD (handle->msg);
2287           if (!context) {
2288             status = VMI_Cache_Deregister (handle->data.send.data.stream.cacheentry);
2289             CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2290           }
2291         } else {
2292           context = NULL;
2293           status = VMI_Cache_Deregister (handle->data.send.data.stream.cacheentry);
2294           CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2295         }
2296       }
2297
2298       if (handle->data.send.send_handle_type == CMI_VMI_SEND_HANDLE_TYPE_RDMAGET) {
2299         if (CMI_VMI_Eager_Protocol) {
2300           context = CONTEXTFIELD (handle->msg);
2301           if (!context) {
2302             status = VMI_Cache_Deregister (handle->data.send.data.rdmaget.cacheentry);
2303             CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2304           }
2305         } else {
2306           context = NULL;
2307           status = VMI_Cache_Deregister (handle->data.send.data.rdmaget.cacheentry);
2308           CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2309         }
2310       }
2311
2312       if (handle->data.send.send_handle_type == CMI_VMI_SEND_HANDLE_TYPE_RDMABROADCAST) {
2313         if (CMI_VMI_Eager_Protocol) {
2314           context = CONTEXTFIELD (handle->msg);
2315           if (!context) {
2316             status = VMI_Cache_Deregister (handle->data.send.data.rdmabroadcast.cacheentry);
2317             CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2318           }
2319         } else {
2320           context = NULL;
2321           status = VMI_Cache_Deregister (handle->data.send.data.rdmabroadcast.cacheentry);
2322           CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2323         }
2324       }
2325
2326       if (handle->data.send.send_handle_type == CMI_VMI_SEND_HANDLE_TYPE_EAGER_LONG) {
2327         if (CMI_VMI_Eager_Protocol) {
2328           context = CONTEXTFIELD (handle->msg);
2329           if (!context) {
2330             status = VMI_Cache_Deregister (handle->data.send.data.eager_long.cacheentry);
2331             CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2332           }
2333         } else {
2334           context = NULL;
2335           status = VMI_Cache_Deregister (handle->data.send.data.eager_long.cacheentry);
2336           CMI_VMI_CHECK_SUCCESS (status, "VMI_Cache_Deregister()");
2337         }
2338       }
2339
2340       if (handle->data.send.message_disposition == CMI_VMI_MESSAGE_DISPOSITION_FREE) {
2341         CmiFree (handle->msg);
2342       }
2343
2344       CMI_VMI_Handle_Deallocate (handle);
2345     }
2346   }
2347 }
2348
2349
2350
2351 /**************************************************************************
2352 ** This code must call VMI_Poll() to ensure forward progress of the message
2353 ** pumping loop.
2354 */
2355 void *CmiGetNonLocal ()
2356 {
2357   VMI_STATUS status;
2358
2359   CMI_VMI_Process_T *process;
2360   CMI_VMI_Handle_T *handle;
2361
2362   int index;
2363   char *msg;
2364   CMI_VMI_Eager_Short_Slot_Footer_T *footer;
2365   int credits_temp;
2366
2367   CMI_VMI_Credit_Message_T credit_msg;
2368   PVOID addrs[1];
2369   ULONG sz[1];
2370
2371   int i;
2372
2373
2374   DEBUG_PRINT ("CmiGetNonLocal() called.\n");
2375
2376   /* Check the eager pollset to see if any new messages have arrived. */
2377   for (i = 0; i < CMI_VMI_Eager_Short_Pollset_Size; i++) {
2378     /* Get the next process in the eager short pollset. */
2379     process = CMI_VMI_Eager_Short_Pollset[i];
2380
2381     /* Examine the footer of the process's next eager short handle. */
2382     index = process->eager_short_receive_index;
2383     handle = process->eager_short_receive_handles[index];
2384     footer = handle->data.receive.data.eager_short.footer;
2385
2386     /* Get data out of the eager short handle (and any after it). */
2387     while (footer->sentinel == CMI_VMI_EAGER_SHORT_SENTINEL_DATA) {
2388       /* Get a pointer to the start of the message data. */
2389       msg = (char *) ((void *) handle->data.receive.data.eager_short.footer - footer->msgsize);
2390
2391       /* Deal with any eager send credits send with the message. */
2392       //credits_temp = CMI_VMI_MESSAGE_CREDITS (msg);
2393       //process->eager_short_send_credits_available += credits_temp;
2394
2395       /* Set up the Converse memory fields prior to the message data. */
2396       SIZEFIELD (msg) = footer->msgsize;
2397       REFFIELD (msg) = 1;
2398       CONTEXTFIELD (msg) = handle;
2399
2400       /* Mark the message footer as "received". */
2401       footer->sentinel = CMI_VMI_EAGER_SHORT_SENTINEL_RECEIVED;
2402
2403       process->eager_short_count += 1;
2404       CMI_VMI_Message_Receive_Count += 1;
2405
2406       /* Enqueue the message. */
2407       //CdsFifo_Enqueue (CpvAccess (CMI_VMI_RemoteQueue), msg);
2408       CMI_VMI_Common_Receive (process->rank, footer->msgsize, msg);
2409
2410       /* Examine the footer of the process's next eager short handle. */
2411       index = (index + 1) % process->eager_short_receive_size;
2412       process->eager_short_receive_index = index;
2413       handle = process->eager_short_receive_handles[index];
2414       footer = handle->data.receive.data.eager_short.footer;
2415     }
2416   }
2417
2418   status = VMI_Poll ();
2419   CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
2420
2421 #if GK_DELAY_DEVICE
2422   {
2423     struct timeval gk_tv;
2424     double gk_time_now;
2425     gk_delayed_msgs *gk_ptr;
2426     char *gk_msg;
2427     int gk_msgsize;
2428     int gk_sender;
2429
2430     if (gk_head_ptr1) {
2431       gettimeofday (&gk_tv, NULL);
2432       gk_time_now = gk_tv.tv_sec + (gk_tv.tv_usec * 0.000001);
2433
2434       if (gk_time_now > (gk_head_ptr1->time + gk_timeout1)) {
2435         gk_msg = gk_head_ptr1->msg;
2436         gk_msgsize = gk_head_ptr1->msgsize;
2437         gk_sender = gk_head_ptr1->sender;
2438
2439         if (gk_head_ptr1 == gk_tail_ptr1) {
2440           free (gk_head_ptr1);
2441           gk_head_ptr1 = NULL;
2442           gk_tail_ptr1 = NULL;
2443         } else {
2444           gk_ptr = gk_head_ptr1;
2445           gk_head_ptr1 = gk_head_ptr1->next;
2446           free (gk_ptr);
2447         }
2448
2449 #if CMK_BROADCAST_SPANNING_TREE
2450         if (CMI_BROADCAST_ROOT (gk_msg)) {
2451           /* Message is enqueued after send to spanning children completes. */
2452           CMI_VMI_Send_Spanning_Children (gk_msgsize, gk_msg);
2453         } else {
2454           CdsFifo_Enqueue (CpvAccess (CMI_VMI_RemoteQueue), gk_msg);
2455         }
2456 #else
2457         CdsFifo_Enqueue (CpvAccess (CMI_VMI_RemoteQueue), gk_msg);
2458 #endif
2459       }
2460     }
2461
2462     if (gk_head_ptr2) {
2463       gettimeofday (&gk_tv, NULL);
2464       gk_time_now = gk_tv.tv_sec + (gk_tv.tv_usec * 0.000001);
2465
2466       if (gk_time_now > (gk_head_ptr2->time + gk_timeout2)) {
2467         gk_msg = gk_head_ptr2->msg;
2468         gk_msgsize = gk_head_ptr2->msgsize;
2469         gk_sender = gk_head_ptr2->sender;
2470
2471         if (gk_head_ptr2 == gk_tail_ptr2) {
2472           free (gk_head_ptr2);
2473           gk_head_ptr2 = NULL;
2474           gk_tail_ptr2 = NULL;
2475         } else {
2476           gk_ptr = gk_head_ptr2;
2477           gk_head_ptr2 = gk_head_ptr2->next;
2478           free (gk_ptr);
2479         }
2480
2481         CMI_VMI_Common_Receive (gk_sender, gk_msgsize, gk_msg);
2482       }
2483     }
2484   }
2485 #endif
2486
2487   return (CdsFifo_Dequeue (CpvAccess (CMI_VMI_RemoteQueue)));
2488 }
2489
2490
2491
2492 /**************************************************************************
2493 **
2494 */
2495 void CmiProbeLatencies ()
2496 {
2497   VMI_STATUS status;
2498
2499   CMI_VMI_Latency_Vector_Request_Message_T request_msg;
2500   PVOID addrs[1];
2501   ULONG sz[1];
2502
2503   int i;
2504
2505
2506   DEBUG_PRINT ("CmiProbeLatencies() called.\n");
2507
2508   CMI_VMI_Latency_Vectors_Received = 0;
2509
2510   /* Send a latency request message to every process except ourself. */
2511   CMI_VMI_MESSAGE_TYPE (&request_msg) = CMI_VMI_MESSAGE_TYPE_LATENCY_VECTOR_REQUEST;
2512   CMI_VMI_MESSAGE_CREDITS (&request_msg) = 0;
2513
2514 #if CMK_BROADCAST_SPANNING_TREE
2515   CMI_SET_BROADCAST_ROOT (&request_msg, 0);
2516 #endif
2517
2518   addrs[0] = (PVOID) &request_msg;
2519   sz[0] = (ULONG) (sizeof (CMI_VMI_Latency_Vector_Request_Message_T));
2520
2521   for (i = 0; i < _Cmi_mype; i++) {
2522     status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, sizeof (CMI_VMI_Latency_Vector_Request_Message_T));
2523     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
2524   }
2525
2526   for (i = (_Cmi_mype + 1); i < _Cmi_numpes; i++) {
2527     status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[i])->connection, addrs, sz, 1, sizeof (CMI_VMI_Latency_Vector_Request_Message_T));
2528     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
2529   }
2530
2531   (&CMI_VMI_Processes[_Cmi_mype])->latency_vector = (unsigned long *) malloc (_Cmi_numpes * sizeof (unsigned long));
2532   for (i = 0; i < _Cmi_numpes; i++) {
2533     if (i == _Cmi_mype) {
2534       (&CMI_VMI_Processes[_Cmi_mype])->latency_vector[i] = 0;
2535     } else {
2536       (&CMI_VMI_Processes[_Cmi_mype])->latency_vector[i] = VMI_CONNECT_ONE_WAY_LATENCY ((&CMI_VMI_Processes[i])->connection);
2537     }
2538   }
2539
2540   while (CMI_VMI_Latency_Vectors_Received < (_Cmi_numpes - 1)) {
2541     status = VMI_Poll ();
2542     CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
2543   }
2544 }
2545
2546
2547
2548 /**************************************************************************
2549 **
2550 */
2551 unsigned long CmiGetLatency (int process1, int process2)
2552 {
2553   DEBUG_PRINT ("CmiGetLatency() called.\n");
2554
2555   if ((&CMI_VMI_Processes[process1])->latency_vector) {
2556     return ((&CMI_VMI_Processes[process1])->latency_vector[process2]);
2557   } else {
2558     return (CMI_VMI_LATENCY_UNKNOWN);
2559   }
2560 }
2561
2562
2563
2564 /**************************************************************************
2565 **
2566 */
2567 int CmiGetCluster (int process)
2568 {
2569   DEBUG_PRINT ("CmiGetCluster() called.\n");
2570
2571   return ((&CMI_VMI_Processes[process])->cluster);
2572 }
2573
2574
2575
2576 #if CMK_GRID_QUEUE_AVAILABLE
2577 /**************************************************************************
2578 **
2579 */
2580 int CmiGridQueueGetInterval ()
2581 {
2582   return (CMI_VMI_Grid_Queue_Interval);
2583 }
2584
2585
2586
2587 /**************************************************************************
2588 **
2589 */
2590 int CmiGridQueueGetThreshold ()
2591 {
2592   return (CMI_VMI_Grid_Queue_Threshold);
2593 }
2594
2595
2596
2597 /**************************************************************************
2598 **
2599 */
2600 void CmiGridQueueRegister (int gid, int nInts, int index1, int index2, int index3)
2601 {
2602   DEBUG_PRINT ("CmiGridQueueRegister() called.\n");
2603
2604   if ((!CMI_VMI_Grid_Queue) || (CMI_VMI_Grid_Objects_Index >= CMI_VMI_Grid_Queue_Maximum)) {
2605     return;
2606   }
2607
2608   if (CmiGridQueueLookup (gid, nInts, index1, index2, index3)) {
2609     return;
2610   }
2611
2612   CMI_VMI_Grid_Objects[CMI_VMI_Grid_Objects_Index].gid = gid;
2613   CMI_VMI_Grid_Objects[CMI_VMI_Grid_Objects_Index].nInts = nInts;
2614   CMI_VMI_Grid_Objects[CMI_VMI_Grid_Objects_Index].index1 = index1;
2615   CMI_VMI_Grid_Objects[CMI_VMI_Grid_Objects_Index].index2 = index2;
2616   CMI_VMI_Grid_Objects[CMI_VMI_Grid_Objects_Index].index3 = index3;
2617
2618   CMI_VMI_Grid_Objects_Index += 1;
2619
2620   if (CMI_VMI_Grid_Objects_Index > 1) {
2621     qsort (CMI_VMI_Grid_Objects, CMI_VMI_Grid_Objects_Index, sizeof (CMI_VMI_Grid_Object_T), CMI_VMI_Grid_Objects_Compare);
2622   }
2623 }
2624
2625
2626
2627 /**************************************************************************
2628 **
2629 */
2630 void CmiGridQueueDeregister (int gid, int nInts, int index1, int index2, int index3)
2631 {
2632   int i;
2633   int j;
2634
2635
2636   DEBUG_PRINT ("CmiGridQueueDeregister() called.\n");
2637
2638   if (!CMI_VMI_Grid_Queue || (CMI_VMI_Grid_Objects_Index == 0)) {
2639     return;
2640   }
2641
2642   for (i = 0; i < CMI_VMI_Grid_Objects_Index; i++) {
2643     if (CMI_VMI_Grid_Objects[i].gid == gid) {
2644       if ((nInts == 1) && (CMI_VMI_Grid_Objects[i].index1 == index1)) {
2645         break;
2646       }
2647       if ((nInts == 2) && ((CMI_VMI_Grid_Objects[i].index1 == index1) &&
2648                            (CMI_VMI_Grid_Objects[i].index2 == index2))) {
2649         break;
2650       }
2651       if ((nInts == 3) && ((CMI_VMI_Grid_Objects[i].index1 == index1) &&
2652                            (CMI_VMI_Grid_Objects[i].index2 == index2) &&
2653                            (CMI_VMI_Grid_Objects[i].index3 == index3))) {
2654         break;
2655       }
2656     }
2657   }
2658
2659   if (i >= CMI_VMI_Grid_Objects_Index) {
2660     return;
2661   }
2662
2663   for (j = i; j < CMI_VMI_Grid_Objects_Index; j++) {
2664     CMI_VMI_Grid_Objects[j].gid = CMI_VMI_Grid_Objects[j+1].gid;
2665     CMI_VMI_Grid_Objects[j].nInts = CMI_VMI_Grid_Objects[j+1].nInts;
2666     CMI_VMI_Grid_Objects[j].index1 = CMI_VMI_Grid_Objects[j+1].index1;
2667     CMI_VMI_Grid_Objects[j].index2 = CMI_VMI_Grid_Objects[j+1].index2;
2668     CMI_VMI_Grid_Objects[j].index3 = CMI_VMI_Grid_Objects[j+1].index3;
2669   }
2670
2671   CMI_VMI_Grid_Objects_Index -= 1;
2672 }
2673
2674
2675
2676 /**************************************************************************
2677 **
2678 */
2679 void CmiGridQueueDeregisterAll ()
2680 {
2681   DEBUG_PRINT ("CmiGridQueueDeregisterAll() called.\n");
2682
2683   CMI_VMI_Grid_Objects_Index = 0;
2684 }
2685
2686
2687
2688 /**************************************************************************
2689 **
2690 */
2691 int CmiGridQueueLookup (int gid, int nInts, int index1, int index2, int index3)
2692 {
2693   int i;
2694   void *ptr;
2695   CMI_VMI_Grid_Object_T key;
2696
2697
2698   DEBUG_PRINT ("CmiGridQueueLookup() called.\n");
2699
2700   if (!CMI_VMI_Grid_Queue || (CMI_VMI_Grid_Objects_Index == 0)) {
2701     return (0);
2702   }
2703
2704   key.gid = gid;
2705   key.nInts = nInts;
2706   key.index1 = index1;
2707   key.index2 = index2;
2708   key.index3 = index3;
2709
2710   ptr = bsearch (&key, CMI_VMI_Grid_Objects, CMI_VMI_Grid_Objects_Index, sizeof (CMI_VMI_Grid_Object_T), CMI_VMI_Grid_Objects_Compare);
2711
2712   if (ptr != NULL) {
2713     return (1);
2714   }
2715
2716   return (0);
2717
2718 /*
2719   for (i = 0; i < CMI_VMI_Grid_Objects_Index; i++) {
2720     if (CMI_VMI_Grid_Objects[i].gid == gid) {
2721       if ((nInts == 1) && (CMI_VMI_Grid_Objects[i].index1 == index1)) {
2722         return (1);
2723       }
2724       if ((nInts == 2) && ((CMI_VMI_Grid_Objects[i].index1 == index1) &&
2725                            (CMI_VMI_Grid_Objects[i].index2 == index2))) {
2726         return (1);
2727       }
2728       if ((nInts == 3) && ((CMI_VMI_Grid_Objects[i].index1 == index1) &&
2729                            (CMI_VMI_Grid_Objects[i].index2 == index2) &&
2730                            (CMI_VMI_Grid_Objects[i].index3 == index3))) {
2731         return (1);
2732       }
2733     }
2734   }
2735
2736   return (0);
2737 */
2738 }
2739
2740
2741
2742 /**************************************************************************
2743 **
2744 */
2745 int CmiGridQueueLookupMsg (char *msg)
2746 {
2747   CMI_VMI_Envelope *env;
2748
2749
2750   DEBUG_PRINT ("CmiGridQueueLookupMsg() called.\n");
2751
2752   env = (CMI_VMI_Envelope *) msg;
2753   if (env->s_attribs.mtype == 16) {
2754     return (CmiGridQueueLookup (env->u_type.array.arr, env->u_type.array.index.nInts,
2755                                 env->u_type.array.index.index[0], env->u_type.array.index.index[1], env->u_type.array.index.index[2]));
2756   }
2757   return (0);
2758 }
2759
2760
2761 /**************************************************************************
2762 **
2763 */
2764 int CMI_VMI_Grid_Objects_Compare (const void *ptr1, const void *ptr2)
2765 {
2766   CMI_VMI_Grid_Object_T *obj1;
2767   CMI_VMI_Grid_Object_T *obj2;
2768
2769
2770   obj1 = (CMI_VMI_Grid_Object_T *) ptr1;
2771   obj2 = (CMI_VMI_Grid_Object_T *) ptr2;
2772
2773   if (obj1->gid < obj2->gid) {
2774     return (-1);
2775   }
2776
2777   if (obj1->gid > obj2->gid) {
2778     return (1);
2779   }
2780
2781   /*
2782     At this point, obj1->gid == obj2->gid.
2783     This implies that obj1->nInts == obj2->nInts.
2784   */
2785
2786   if (obj1->nInts != obj2->nInts) {
2787     CmiAbort ("Invalid data stored in Grid Queue lookup table.");
2788   }
2789
2790   if (obj1->nInts == 1) {
2791     if (obj1->index1 < obj2->index1) {
2792       return (-1);
2793     }
2794
2795     if (obj1->index1 > obj2->index1) {
2796       return (1);
2797     }
2798
2799     return (0);
2800   }
2801
2802   if (obj1->nInts == 2) {
2803     if (obj1->index1 < obj2->index1) {
2804       return (-1);
2805     }
2806
2807     if (obj1->index1 > obj2->index1) {
2808       return (1);
2809     }
2810
2811     /* At this point, obj1->index1 == obj2->index1. */
2812
2813     if (obj1->index2 < obj2->index2) {
2814       return (-1);
2815     }
2816
2817     if (obj1->index2 > obj2->index2) {
2818       return (1);
2819     }
2820
2821     return (0);
2822   }
2823
2824   if (obj1->nInts == 3) {
2825     if (obj1->index1 < obj2->index1) {
2826       return (-1);
2827     }
2828
2829     if (obj1->index1 > obj2->index1) {
2830       return (1);
2831     }
2832
2833     /* At this point, obj1->index1 == obj2->index1. */
2834
2835     if (obj1->index2 < obj2->index2) {
2836       return (-1);
2837     }
2838
2839     if (obj1->index2 > obj2->index2) {
2840       return (1);
2841     }
2842
2843     /* At this point, obj1->index2 == obj2->index2. */
2844
2845     if (obj1->index3 < obj2->index3) {
2846       return (-1);
2847     }
2848
2849     if (obj1->index3 > obj2->index3) {
2850       return (1);
2851     }
2852
2853     return (0);
2854   }
2855
2856   CmiAbort ("Invalid data stored in Grid Queue lookup table.");
2857 }
2858 #endif
2859
2860
2861
2862 #if CMK_PERSISTENT_COMM
2863 /**************************************************************************
2864 ** done
2865 */
2866 void CmiPersistentInit ()
2867 {
2868   DEBUG_PRINT ("CmiPersistentInit() called.\n");
2869 }
2870
2871
2872
2873 /**************************************************************************
2874 ** done
2875 */
2876 PersistentHandle CmiCreatePersistent (int destrank, int maxsize)
2877 {
2878   VMI_STATUS status;
2879
2880   CMI_VMI_Persistent_Request_Message_T request_msg;
2881   PVOID addrs[1];
2882   ULONG sz[1];
2883
2884
2885   DEBUG_PRINT ("CmiCreatePersistent() called.\n");
2886
2887   if (CMI_VMI_Eager_Protocol) {
2888     CMI_VMI_MESSAGE_TYPE (&request_msg) = CMI_VMI_MESSAGE_TYPE_PERSISTENT_REQUEST;
2889     CMI_VMI_MESSAGE_CREDITS (&request_msg) = 0;
2890
2891 #if CMK_BROADCAST_SPANNING_TREE
2892     CMI_SET_BROADCAST_ROOT (&request_msg, 0);
2893 #endif
2894
2895     request_msg.maxsize = maxsize;
2896
2897     addrs[0] = (PVOID) &request_msg;
2898     sz[0] = (ULONG) (sizeof (CMI_VMI_Persistent_Request_Message_T));
2899
2900     status = VMI_Stream_Send_Inline ((&CMI_VMI_Processes[destrank])->connection, addrs, sz, 1, sizeof (CMI_VMI_Persistent_Request_Message_T));
2901     CMI_VMI_CHECK_SUCCESS (status, "VMI_Stream_Send_Inline()");
2902   }
2903
2904   return ((PersistentHandle) NULL);
2905 }
2906
2907
2908
2909 /**************************************************************************
2910 ** done
2911 */
2912 void CmiUsePersistentHandle (PersistentHandle *handle_array, int array_size)
2913 {
2914   DEBUG_PRINT ("CmiUserPersistentHandle() called.\n");
2915 }
2916
2917
2918
2919 /**************************************************************************
2920 ** done
2921 */
2922 void CmiDestroyPersistent (PersistentHandle phandle)
2923 {
2924   DEBUG_PRINT ("CmiDestroyPersistent() called.\n");
2925 }
2926
2927
2928
2929 /**************************************************************************
2930 ** done
2931 */
2932 void CmiDestroyAllPersistent ()
2933 {
2934   DEBUG_PRINT ("CmiDestroyAllPersistent() called.\n");
2935 }
2936
2937
2938
2939 /**************************************************************************
2940 **
2941 */
2942 PersistentReq CmiCreateReceiverPersistent (int maxsize)
2943 {
2944   PersistentReq request;
2945
2946
2947   DEBUG_PRINT ("CmiCreateReceiverPersistent() called.\n");
2948
2949   request.pe = _Cmi_mype;
2950   request.maxBytes = maxsize;
2951
2952   return (request);
2953 }
2954
2955
2956
2957 /**************************************************************************
2958 **
2959 */
2960 PersistentHandle CmiRegisterReceivePersistent (PersistentReq request)
2961 {
2962   DEBUG_PRINT ("CmiRegisterReceivePersistent() called.\n");
2963
2964   if (CMI_VMI_Eager_Protocol) {
2965     CMI_VMI_Eager_Short_Setup (request.pe);
2966
2967     if (request.maxBytes > CMI_VMI_Eager_Short_Message_Boundary) {
2968       if (request.maxBytes < CMI_VMI_Eager_Long_Buffer_Size) {
2969         CMI_VMI_Eager_Long_Setup (request.pe, CMI_VMI_Eager_Long_Buffer_Size);
2970       } else {
2971         CMI_VMI_Eager_Long_Setup (request.pe, request.maxBytes);
2972       }
2973     }
2974   }
2975
2976   return ((PersistentHandle) NULL);
2977 }
2978 #endif   /* CMK_PERSISTENT_COMM */
2979
2980
2981
2982 /**************************************************************************
2983 **
2984 */
2985 void CMI_VMI_Read_Environment ()
2986 {
2987   char *value;
2988
2989   int dummy1;
2990   int dummy2;
2991
2992
2993   DEBUG_PRINT ("CMI_VMI_Read_Environment() called.\n");
2994
2995   /* Get the username for this process. */
2996   if (value = (getpwuid (getuid ()))->pw_name) {
2997     if (!(CMI_VMI_Username = strdup (value))) {
2998       CmiAbort ("Unable to allocate memory for the username.");
2999     }
3000   } else {
3001     CmiAbort ("Unable to get the username for this process.");
3002   }
3003
3004   /* Get the program key. */
3005   if (value = getenv ("VMI_KEY")) {
3006     /* Free the default value established in ConverseInit() from argv[0]. */
3007     free (CMI_VMI_Program_Key);
3008
3009     if (!(CMI_VMI_Program_Key = strdup (value))) {
3010       CmiAbort ("Unable to allocate memory for the program key.");
3011     }
3012   }
3013
3014   /* Get the total number of processes in the computation. */
3015   if (value = getenv ("VMI_PROCS")) {
3016     _Cmi_numpes = atoi (value);
3017   } else {
3018     CmiAbort ("Unable to determine the number of processes in the computation (VMI_PROCS).");
3019   }
3020
3021   /* Get parameters for runtime behavior that override default values. */
3022   if (value = getenv ("CMI_VMI_WAN_LATENCY")) {
3023     CMI_VMI_WAN_Latency = atoi (value);
3024   }
3025
3026   if (value = getenv ("CMI_VMI_CLUSTER")) {
3027     CMI_VMI_Cluster = atoi (value);
3028   }
3029
3030   if (value = getenv ("CMI_VMI_PROBE_CLUSTERS")) {
3031     CMI_VMI_Probe_Clusters = atoi (value);
3032   }
3033
3034 #if CMK_GRID_QUEUE_AVAILABLE
3035   if (value = getenv ("CMI_VMI_GRID_QUEUE")) {
3036     CMI_VMI_Grid_Queue = atoi (value);
3037   }
3038
3039   if (value = getenv ("CMI_VMI_GRID_QUEUE_MAXIMUM")) {
3040     CMI_VMI_Grid_Queue_Maximum = atoi (value);
3041   }
3042
3043   if (value = getenv ("CMI_VMI_GRID_QUEUE_INTERVAL")) {
3044     CMI_VMI_Grid_Queue_Interval = atoi (value);
3045   }
3046
3047   if (value = getenv ("CMI_VMI_GRID_QUEUE_THRESHOLD")) {
3048     CMI_VMI_Grid_Queue_Threshold = atoi (value);
3049   }
3050 #endif
3051
3052   if (value = getenv ("CMI_VMI_MEMORY_POOL")) {
3053     CMI_VMI_Memory_Pool = atoi (value);
3054   }
3055
3056   if (value = getenv ("CMI_VMI_TERMINATE_VMI_HACK")) {
3057     CMI_VMI_Terminate_VMI_Hack = atoi (value);
3058   }
3059
3060   if (value = getenv ("CMI_VMI_CONNECTION_TIMEOUT")) {
3061     CMI_VMI_Connection_Timeout = atoi (value);
3062   }
3063
3064   if (value = getenv ("CMI_VMI_MAXIMUM_HANDLES")) {
3065     CMI_VMI_Maximum_Handles = atoi (value);
3066   }
3067
3068   if (value = getenv ("CMI_VMI_SMALL_MESSAGE_BOUNDARY")) {
3069     CMI_VMI_Small_Message_Boundary = atoi (value);
3070   }
3071
3072   if (value = getenv ("CMI_VMI_MEDIUM_MESSAGE_BOUNDARY")) {
3073     CMI_VMI_Medium_Message_Boundary = atoi (value);
3074   }
3075
3076   if (value = getenv ("CMI_VMI_EAGER_PROTOCOL")) {
3077     CMI_VMI_Eager_Protocol = atoi (value);
3078   }
3079
3080   if (value = getenv ("CMI_VMI_EAGER_INTERVAL")) {
3081     CMI_VMI_Eager_Interval = atoi (value);
3082   }
3083
3084   if (value = getenv ("CMI_VMI_EAGER_THRESHOLD")) {
3085     CMI_VMI_Eager_Threshold = atoi (value);
3086   }
3087
3088   if (value = getenv ("CMI_VMI_EAGER_SHORT_POLLSET_SIZE_MAXIMUM")) {
3089     CMI_VMI_Eager_Short_Pollset_Size_Maximum = atoi (value);
3090   }
3091
3092   if (value = getenv ("CMI_VMI_EAGER_SHORT_SLOTS")) {
3093     CMI_VMI_Eager_Short_Slots = atoi (value);
3094   }
3095
3096   if (value = getenv ("CMI_VMI_EAGER_SHORT_MESSAGE_BOUNDARY")) {
3097     CMI_VMI_Eager_Short_Message_Boundary = atoi (value);
3098
3099     /*
3100       If the eager short message boundary is greater than 65536
3101       bytes, reset it to 65536.  This is because the eager short
3102       footer sends the eager short msgsize as an unsigned short
3103       which is limited to a maximum value of 65536.
3104     */
3105     if (CMI_VMI_Eager_Short_Message_Boundary > 65536) {
3106       CMI_VMI_Eager_Short_Message_Boundary = 65536;
3107     }
3108   }
3109
3110   if (value = getenv ("CMI_VMI_EAGER_LONG_BUFFERS")) {
3111     CMI_VMI_Eager_Long_Buffers = atoi (value);
3112   }
3113
3114   if (value = getenv ("CMI_VMI_EAGER_LONG_BUFFER_SIZE")) {
3115     CMI_VMI_Eager_Long_Buffer_Size = atoi (value);
3116   }
3117
3118   /* Figure out the startup type. */
3119   value = getenv ("CRM");
3120   if (value) {
3121     CMI_VMI_Startup_Type = CMI_VMI_STARTUP_TYPE_CRM;
3122     if (strstr (value, ":")) {
3123       CMI_VMI_CRM_Hostname = strdup (value);
3124       dummy1 = 0;
3125       while (CMI_VMI_CRM_Hostname[dummy1] != ':') {
3126         dummy1 += 1;
3127       }
3128       CMI_VMI_CRM_Hostname[dummy1] = 0;
3129       CMI_VMI_CRM_Port = atoi (value + dummy1 + 1);
3130     } else {
3131       CMI_VMI_CRM_Hostname = strdup (value);
3132       CMI_VMI_CRM_Port = CMI_VMI_CRM_PORT;
3133     }
3134     return;
3135   }
3136
3137   value = getenv ("NETSTART");
3138   if (value) {
3139     CMI_VMI_Startup_Type = CMI_VMI_STARTUP_TYPE_CHARMRUN;
3140     sscanf (value, "%d%s%d%d%d", &_Cmi_mype, CMI_VMI_Charmrun_IP, &CMI_VMI_Charmrun_Port, &dummy1, &dummy2);
3141     return;
3142   }
3143 }
3144
3145
3146
3147 /**************************************************************************
3148 **
3149 */
3150 int CMI_VMI_Startup_CRM ()
3151 {
3152   pid_t myPID;
3153
3154   struct hostent *host_ent;
3155
3156   struct sockaddr_in serv_addr;
3157   int rc;
3158
3159   CMI_VMI_CRM_Register_Message_T msg_register;
3160
3161   int msg_code;
3162   int msg_error;
3163
3164   int msg_numpes;
3165
3166   CMI_VMI_CRM_Nodeblock_Message_T *msg_nodeblock;
3167
3168   char crm_ip[1024];
3169
3170   struct sockaddr_in local;
3171   socklen_t sockaddr_len;
3172   int myIP;
3173
3174   int i;
3175
3176
3177   DEBUG_PRINT ("CMI_VMI_Startup_CRM() called.\n");
3178
3179   myPID = getpid ();
3180
3181   CMI_VMI_CRM_Socket = socket (AF_INET, SOCK_STREAM, 0);
3182   if (CMI_VMI_CRM_Socket < 0) {
3183     DEBUG_PRINT ("Error opening socket to CRM.\n");
3184     return (-1);
3185   }
3186
3187   host_ent = gethostbyname (CMI_VMI_CRM_Hostname);
3188   if (!host_ent) {
3189     DEBUG_PRINT ("Error in gethostbyname() while contacting CRM.\n");
3190     return (-1);
3191   }
3192
3193   strcpy (crm_ip, inet_ntoa (*((struct in_addr *) host_ent->h_addr_list[0])));
3194
3195   //memset ((void *) &serv_addr, 0, sizeof (serv_addr));
3196   memset ((void *) &serv_addr, 0, sizeof (struct sockaddr_in));
3197   serv_addr.sin_family = AF_INET;
3198   serv_addr.sin_addr.s_addr = inet_addr (crm_ip);
3199   serv_addr.sin_port = htons (CMI_VMI_CRM_Port);
3200
3201   //rc = connect (CMI_VMI_CRM_Socket, (struct sockaddr *) &serv_addr, sizeof (serv_addr));
3202   rc = connect (CMI_VMI_CRM_Socket, (struct sockaddr *) &serv_addr, sizeof (struct sockaddr_in));
3203   if (rc < 0) {
3204     DEBUG_PRINT ("Error connecting to CRM.\n");
3205     return (rc);
3206   }
3207
3208   memset ((void *) &local, 0, sizeof (struct sockaddr_in));
3209   sockaddr_len = sizeof (struct sockaddr_in);
3210   rc = getsockname (CMI_VMI_CRM_Socket, (struct sockaddr *) &local, &sockaddr_len);
3211   if (rc < 0) {
3212     DEBUG_PRINT ("Error getting local TCP/IP address while synchronizing with CRM.\n");
3213     close (CMI_VMI_CRM_Socket);
3214     return (rc);
3215   }
3216   myIP = (int) local.sin_addr.s_addr;
3217
3218   msg_code = htonl (CMI_VMI_CRM_MESSAGE_REGISTER);
3219
3220   rc = CMI_VMI_Socket_Send (CMI_VMI_CRM_Socket, (const void *) &msg_code, sizeof (int));
3221   if (rc < 0) {
3222     DEBUG_PRINT ("Error sending to CRM.\n");
3223     close (CMI_VMI_CRM_Socket);
3224     return (rc);
3225   }
3226
3227   msg_register.numpes = htonl (_Cmi_numpes);
3228   msg_register.cluster = htonl (CMI_VMI_Cluster);
3229   msg_register.node_context = htonl (myPID);
3230   msg_register.key_length = htonl (strlen (CMI_VMI_Program_Key));
3231   strcpy ((char *) &msg_register.key, CMI_VMI_Program_Key);
3232
3233   rc = CMI_VMI_Socket_Send (CMI_VMI_CRM_Socket, (const void *) &msg_register, ((4 * sizeof (int)) + strlen (CMI_VMI_Program_Key)));
3234   if (rc < 0) {
3235     DEBUG_PRINT ("Error sending to CRM.\n");
3236     close (CMI_VMI_CRM_Socket);
3237     return (rc);
3238   }
3239
3240   rc = CMI_VMI_Socket_Receive (CMI_VMI_CRM_Socket, &msg_code, sizeof (int));
3241   if (rc < 0) {
3242     DEBUG_PRINT ("Error receiveing from CRM.\n");
3243     close (CMI_VMI_CRM_Socket);
3244     return (rc);
3245   }
3246
3247   msg_code = ntohl (msg_code);
3248
3249   switch (msg_code)
3250   {
3251     case CMI_VMI_CRM_MESSAGE_SUCCESS:
3252       rc = CMI_VMI_Socket_Receive (CMI_VMI_CRM_Socket, &msg_numpes, sizeof (int));
3253       if (rc < 0) {
3254         DEBUG_PRINT ("Error receiveing from CRM.\n");
3255         close (CMI_VMI_CRM_Socket);
3256         return (rc);
3257       }
3258
3259       msg_numpes = ntohl (msg_numpes);
3260
3261       msg_nodeblock = malloc (msg_numpes * sizeof (CMI_VMI_CRM_Nodeblock_Message_T));
3262       if (!msg_nodeblock) {
3263         DEBUG_PRINT ("Unable to allocate memory to receive nodeblock from CRM.\n");
3264         close (CMI_VMI_CRM_Socket);
3265         return (-1);
3266       }
3267
3268       rc = CMI_VMI_Socket_Receive (CMI_VMI_CRM_Socket, msg_nodeblock, msg_numpes * sizeof (CMI_VMI_CRM_Nodeblock_Message_T));
3269       if (rc < 0) {
3270         DEBUG_PRINT ("Error receiveing from CRM.\n");
3271         close (CMI_VMI_CRM_Socket);
3272         return (rc);
3273       }
3274
3275       _Cmi_mype = -1;
3276       for (i = 0; i < msg_numpes; i++) {
3277         (&CMI_VMI_Processes[i])->rank = i;
3278         (&CMI_VMI_Processes[i])->node_IP = (&msg_nodeblock[i])->node_IP;
3279         (&CMI_VMI_Processes[i])->cluster = ntohl ((&msg_nodeblock[i])->cluster);
3280
3281         (&msg_nodeblock[i])->node_context = ntohl ((&msg_nodeblock[i])->node_context);
3282
3283         if (((&msg_nodeblock[i])->node_IP == myIP) && ((&msg_nodeblock[i])->node_context) == myPID) {
3284           _Cmi_mype = i;
3285         }
3286       }
3287
3288       free (msg_nodeblock);
3289
3290       close (CMI_VMI_CRM_Socket);
3291
3292       return (0);
3293
3294       break;
3295
3296     case CMI_VMI_CRM_MESSAGE_FAILURE:
3297       rc = CMI_VMI_Socket_Receive (CMI_VMI_CRM_Socket, &msg_error, sizeof (int));
3298       if (rc < 0) {
3299         DEBUG_PRINT ("Error receiveing from CRM.\n");
3300         close (CMI_VMI_CRM_Socket);
3301         return (rc);
3302       }
3303
3304       msg_error = ntohl (msg_error);
3305
3306       switch (msg_error)
3307       {
3308         case CMI_VMI_CRM_ERROR_CONFLICT:
3309           CmiPrintf ("Error synchronizing with CRM (key/# PE conflict).\n");
3310           break;
3311
3312         case CMI_VMI_CRM_ERROR_TIMEOUT:
3313           CmiPrintf ("Error synchronizing with CRM (timeout).\n");
3314           break;
3315
3316         default:
3317           CmiPrintf ("Error synchronizing with CRM (unknown problem).\n");
3318           break;
3319       }
3320
3321       return (-1);
3322
3323       close (CMI_VMI_CRM_Socket);
3324
3325       break;
3326
3327     default:
3328       printf ("Unknown message code received from CRM.\n");
3329       return (-1);
3330       break;
3331   }
3332
3333   /* Should never get here! */
3334   return (-2);
3335 }
3336
3337
3338
3339 /**************************************************************************
3340 **
3341 */
3342 int CMI_VMI_Startup_Charmrun ()
3343 {
3344   pid_t myPID;
3345
3346   CMI_VMI_Charmrun_Message_Header_T hdr;
3347   CMI_VMI_Charmrun_Register_Message_T msg_register;
3348   int msg_numnodes;
3349   CMI_VMI_Charmrun_Nodeblock_Message_T *msg_nodeblock;
3350
3351   struct sockaddr_in serv_addr;
3352   int rc;
3353
3354   int i;
3355   int j;
3356
3357
3358   DEBUG_PRINT ("CMI_VMI_Startup_Charmrun() called.\n");
3359
3360   myPID = getpid ();
3361
3362   CMI_VMI_Charmrun_Socket = socket (AF_INET, SOCK_STREAM, 0);
3363   if (CMI_VMI_Charmrun_Socket < 0) {
3364     DEBUG_PRINT ("Error opening socket to charmrun.\n");
3365     return (-1);
3366   }
3367
3368   //bzero ((char *) &serv_addr, sizeof (serv_addr));
3369   memset ((void *) &serv_addr, 0, sizeof (struct sockaddr_in));
3370   serv_addr.sin_family = AF_INET;
3371   serv_addr.sin_addr.s_addr = inet_addr (CMI_VMI_Charmrun_IP);
3372   serv_addr.sin_port = htons (CMI_VMI_Charmrun_Port);
3373
3374   rc = connect (CMI_VMI_Charmrun_Socket, (struct sockaddr *) &serv_addr, sizeof (serv_addr));
3375   if (rc < 0) {
3376     DEBUG_PRINT ("Error connecting to charmrun.\n");
3377     return (rc);
3378   }
3379
3380   hdr.msg_len = htonl (sizeof (CMI_VMI_Charmrun_Register_Message_T));
3381   strcpy (hdr.msg_type, "initnode");
3382
3383   msg_register.node_number = htonl (_Cmi_mype);   /* the rank of this PE */
3384   msg_register.numpes = htonl (0);                /* ignored */
3385   msg_register.dataport = htonl (myPID);          /* not used by vmi-linux -- must not be 0! */
3386   msg_register.mach_id = htonl (0);               /* not used by vmi-linux */
3387   msg_register.node_IP = htonl (0);               /* ignored */
3388
3389   rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, (const void *) &hdr, sizeof (CMI_VMI_Charmrun_Message_Header_T));
3390   if (rc < 0) {
3391     DEBUG_PRINT ("Error sending to charmrun.\n");
3392     return (rc);
3393   }
3394   rc = CMI_VMI_Socket_Send (CMI_VMI_Charmrun_Socket, (const void *) &msg_register, sizeof (CMI_VMI_Charmrun_Register_Message_T));
3395   if (rc < 0) {
3396     DEBUG_PRINT ("Error sending to charmrun.\n");
3397     return (rc);
3398   }
3399
3400   rc = CMI_VMI_Socket_Receive (CMI_VMI_Charmrun_Socket, (void *) &hdr, sizeof (CMI_VMI_Charmrun_Message_Header_T));
3401   if (rc < 0) {
3402     DEBUG_PRINT ("Error receiving from charmrun.\n");
3403     return (rc);
3404   }
3405   rc = CMI_VMI_Socket_Receive (CMI_VMI_Charmrun_Socket, (void *) &msg_numnodes, sizeof (int));
3406   if (rc < 0) {
3407     DEBUG_PRINT ("Error receiving from charmrun.\n");
3408     return (rc);
3409   }
3410
3411   msg_numnodes = ntohl (msg_numnodes);
3412
3413   msg_nodeblock = (CMI_VMI_Charmrun_Nodeblock_Message_T *) malloc (msg_numnodes * sizeof (CMI_VMI_Charmrun_Nodeblock_Message_T));
3414
3415   rc = CMI_VMI_Socket_Receive (CMI_VMI_Charmrun_Socket, (void *) msg_nodeblock, (msg_numnodes * sizeof (CMI_VMI_Charmrun_Nodeblock_Message_T)));
3416   if (rc < 0) {
3417     DEBUG_PRINT ("Error receiving from charmrun.\n");
3418     return (rc);
3419   }
3420
3421   for (i = 0; i < msg_numnodes; i++) {
3422     (&CMI_VMI_Processes[i])->node_IP = (&msg_nodeblock[i])->node_IP;
3423     (&CMI_VMI_Processes[i])->rank = i;
3424   }
3425
3426   free (msg_nodeblock);
3427
3428   /* Return successfully. */
3429   return (0);
3430 }
3431
3432
3433
3434 /**************************************************************************
3435 ** This function initializes VMI.  It assumes that we know our rank in the
3436 ** computation (i.e., _Cmi_mype is set).
3437 **
3438 ** We need a unique VMI key for each process, so we use
3439 ** "[syncronization key]:[process rank]" for each processes's key.
3440 ** This enables us to figure out each process's key later when we connect
3441 ** to that process.
3442 */
3443 int CMI_VMI_Initialize_VMI ()
3444 {
3445   VMI_STATUS status;
3446
3447   char *vmi_key;
3448   char *vmi_inlined_data_size;
3449
3450
3451   DEBUG_PRINT ("CMI_VMI_Initialize_VMI() called.\n");
3452
3453   /* Set the VMI_KEY environment variable. */
3454   vmi_key = (char *) malloc ((strlen (CMI_VMI_Program_Key)) + 32);
3455   if (!vmi_key) {
3456     DEBUG_PRINT ("Unable to allocate memory for VMI key.");
3457     return (-1);
3458   }
3459
3460   sprintf (vmi_key, "VMI_KEY=%s:%d\0", CMI_VMI_Program_Key, _Cmi_mype);
3461
3462   if (putenv (vmi_key) < 0) {
3463     DEBUG_PRINT ("Unable to set VMI_KEY environment variable.");
3464     return (-1);
3465   }
3466
3467   /* Set the maximum size of inlined stream messages. */
3468   vmi_inlined_data_size = (char *) malloc (32);
3469   if (!vmi_inlined_data_size) {
3470     DEBUG_PRINT ("Unable to allocate memory for VMI inlined data size.");
3471     return (-1);
3472   }
3473
3474   sprintf (vmi_inlined_data_size, "VMI_INLINED_DATA_SZ=%d\0", CMI_VMI_Medium_Message_Boundary);
3475
3476   if (putenv (vmi_inlined_data_size) < 0) {
3477     DEBUG_PRINT ("Unable to set VMI_INLINED_DATA_SZ environment variable.");
3478     return (-1);
3479   }
3480
3481   DEBUG_PRINT ("Initializing VMI with key %s.\n", vmi_key);
3482
3483   /* Initialize VMI. */
3484   status = VMI_Init (0, NULL);
3485   CMI_VMI_CHECK_SUCCESS (status, "VMI_Init()");
3486
3487   /* Set a connection accept function. */
3488   status = VMI_Connection_Accept_Fn (CMI_VMI_Connection_Handler);
3489   CMI_VMI_CHECK_SUCCESS (status, "VMI_Connection_Accept_Fn()");
3490
3491   /* Set a connection disconnect function. */
3492   status = VMI_Connection_Disconnect_Fn (CMI_VMI_Disconnection_Handler);
3493   CMI_VMI_CHECK_SUCCESS (status, "VMI_Connection_Disconnect_Fn()");
3494
3495   /* Set a stream receive function. */
3496   VMI_STREAM_SET_RECV_FUNCTION (CMI_VMI_Stream_Notification_Handler);
3497
3498   /* Create buffer pools. */
3499   if (CMI_VMI_Memory_Pool) {
3500     status = VMI_Pool_Create_Buffer_Pool (CMI_VMI_BUCKET1_SIZE, sizeof (PVOID), CMI_VMI_BUCKET1_PREALLOCATE,
3501                                           CMI_VMI_BUCKET1_GROW, VMI_POOL_CLEARONCE, &CMI_VMI_Bucket1_Pool);
3502     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Create_Buffer_Pool()");
3503
3504     status = VMI_Pool_Create_Buffer_Pool (CMI_VMI_BUCKET2_SIZE, sizeof (PVOID), CMI_VMI_BUCKET2_PREALLOCATE,
3505                                           CMI_VMI_BUCKET2_GROW, VMI_POOL_CLEARONCE, &CMI_VMI_Bucket2_Pool);
3506     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Create_Buffer_Pool()");
3507
3508     status = VMI_Pool_Create_Buffer_Pool (CMI_VMI_BUCKET3_SIZE, sizeof (PVOID), CMI_VMI_BUCKET3_PREALLOCATE,
3509                                           CMI_VMI_BUCKET3_GROW, VMI_POOL_CLEARONCE, &CMI_VMI_Bucket3_Pool);
3510     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Create_Buffer_Pool()");
3511
3512     status = VMI_Pool_Create_Buffer_Pool (CMI_VMI_BUCKET4_SIZE, sizeof (PVOID), CMI_VMI_BUCKET4_PREALLOCATE,
3513                                           CMI_VMI_BUCKET4_GROW, VMI_POOL_CLEARONCE, &CMI_VMI_Bucket4_Pool);
3514     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Create_Buffer_Pool()");
3515
3516     status = VMI_Pool_Create_Buffer_Pool (CMI_VMI_BUCKET5_SIZE, sizeof (PVOID), CMI_VMI_BUCKET5_PREALLOCATE,
3517                                           CMI_VMI_BUCKET5_GROW, VMI_POOL_CLEARONCE, &CMI_VMI_Bucket5_Pool);
3518     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Create_Buffer_Pool()");
3519   }
3520
3521   /* Free memory. */
3522   free (vmi_inlined_data_size);
3523   free (vmi_key);
3524
3525   /* Return successfully. */
3526   return (0);
3527 }
3528
3529
3530
3531 /**************************************************************************
3532 **
3533 */
3534 int CMI_VMI_Terminate_VMI ()
3535 {
3536   VMI_STATUS status;
3537
3538
3539   DEBUG_PRINT ("CMI_VMI_Terminate_VMI() called.\n");
3540
3541   /* Release memory used by buffer pools. */
3542   if (CMI_VMI_Memory_Pool) {
3543     status = VMI_Pool_Destroy_Buffer_Pool (CMI_VMI_Bucket1_Pool);
3544     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Destroy_Buffer_Pool()");
3545
3546     status = VMI_Pool_Destroy_Buffer_Pool (CMI_VMI_Bucket2_Pool);
3547     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Destroy_Buffer_Pool()");
3548
3549     status = VMI_Pool_Destroy_Buffer_Pool (CMI_VMI_Bucket3_Pool);
3550     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Destroy_Buffer_Pool()");
3551
3552     status = VMI_Pool_Destroy_Buffer_Pool (CMI_VMI_Bucket4_Pool);
3553     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Destroy_Buffer_Pool()");
3554
3555     status = VMI_Pool_Destroy_Buffer_Pool (CMI_VMI_Bucket5_Pool);
3556     CMI_VMI_CHECK_SUCCESS (status, "VMI_Pool_Destroy_Buffer_Pool()");
3557   }
3558
3559   /* Terminate VMI. */
3560   SET_VMI_SUCCESS (status);
3561   VMI_Terminate (status);
3562
3563
3564   /* Return successfully. */
3565   return (0);
3566 }
3567
3568
3569
3570 /**************************************************************************
3571 **
3572 */
3573 int CMI_VMI_Socket_Send (int sockfd, const void *msg, int size)
3574 {
3575   int sent;
3576   int rc;
3577
3578
3579   DEBUG_PRINT ("CMI_VMI_Socket_Send() called.\n");
3580
3581   sent = 0;
3582   while (sent < size) {
3583     rc = send (sockfd, (const void *) (msg + sent), (size - sent), 0);
3584     if (rc < 0) {
3585       return (rc);
3586     } else {
3587       sent += rc;
3588     }
3589   }
3590
3591   return (sent);
3592 }
3593
3594
3595
3596 /**************************************************************************
3597 **
3598 */
3599 int CMI_VMI_Socket_Receive (int sockfd, void *msg, int size)
3600 {
3601   int received;
3602   int rc;
3603
3604
3605   DEBUG_PRINT ("CMI_VMI_Socket_Receive() called.\n");
3606
3607   received = 0;
3608   while (received < size) {
3609     rc = recv (sockfd, (void *) (msg + received), (size - received), 0);
3610     if (rc < 0) {
3611       return (rc);
3612     } else {
3613       received += rc;
3614     }
3615   }
3616
3617   return (received);
3618 }
3619
3620
3621
3622 /**************************************************************************
3623 ** Set up connections between all processes.
3624 **
3625 ** Issue connection requests to all processes with a rank lower than our
3626 ** rank and wait for connection requests from processes with a rank higher
3627 ** than our rank.
3628 **
3629 ** Each connection request contains connect data containing the rank of
3630 ** the process attempting the connection.  That way the current process
3631 ** can store the connection information in the correct slot in the
3632 ** CMI_VMI_ProcessList[].
3633 */
3634 int CMI_VMI_Open_Connections ()
3635 {
3636   VMI_STATUS status;
3637
3638   char *remote_key;
3639
3640   PVMI_BUFFER connect_message_buffer;
3641   CMI_VMI_Connect_Message_T *connect_message_data;
3642
3643   int i;
3644
3645   //CMI_VMI_Process_T *process;
3646
3647   struct timeval tp;
3648   long start_time;
3649   long now_time;
3650   BOOLEAN pending;
3651
3652
3653   DEBUG_PRINT ("CMI_VMI_Open_Connections() called.\n");
3654
3655   /* Allocate space for the remote key. */
3656   remote_key = malloc ((strlen (CMI_VMI_Program_Key)) + 32);
3657   if (!remote_key) {
3658     DEBUG_PRINT ("Unable to allocate memory for remote key.\n");
3659     return (-1);
3660   }
3661
3662   /* Allocate a buffer for connection message. */
3663   status = VMI_Buffer_Allocate (sizeof (CMI_VMI_Connect_Message_T), &connect_message_buffer);
3664   if (!VMI_SUCCESS (status)) {
3665     DEBUG_PRINT ("Unable to allocate connection message buffer.\n");
3666     free (remote_key);
3667     return (-1);
3668   }
3669
3670   /* Set up the connection message field. */
3671   connect_message_data = (CMI_VMI_Connect_Message_T *) VMI_BUFFER_ADDRESS (connect_message_buffer);
3672   connect_message_data->rank = htonl (_Cmi_mype);
3673
3674   /* Open connections to every process with a lower rank than ours. */
3675   for (i = 0; i < _Cmi_mype; i++) {
3676     /* Construct a remote VMI key in terms of the program key and peer's rank */
3677     sprintf (remote_key, "%s:%u\0", CMI_VMI_Program_Key, (&CMI_VMI_Processes[i])->rank);
3678
3679     CMI_VMI_Open_Connection (i, remote_key, connect_message_buffer);
3680
3681     DEBUG_PRINT ("Issued a connection to process %d:\n", i);
3682     DEBUG_PRINT ("\tRank - %d\n", (&CMI_VMI_Processes[i])->rank);
3683     DEBUG_PRINT ("\tKey - %s\n", remote_key);
3684     DEBUG_PRINT ("\tHostname - %s\n", remote_host->h_name);
3685     DEBUG_PRINT ("\tIP - [%d.%d.%d.%d].\n", ((process->node_IP >>  0) & 0xFF),
3686                                             ((process->node_IP >>  8) & 0xFF),
3687                                             ((process->node_IP >> 16) & 0xFF),
3688                                             ((process->node_IP >> 24) & 0xFF));
3689   }
3690
3691   /* Set the connection state to ourself to "connected". */
3692   (&CMI_VMI_Processes[_Cmi_mype])->connection_state = CMI_VMI_CONNECTION_CONNECTED;
3693
3694   /* Wait for connections.  */
3695   gettimeofday (&tp, NULL);
3696   start_time = tp.tv_sec;
3697   now_time   = tp.tv_sec;
3698   pending    = TRUE;
3699
3700   while (pending && ((start_time + CMI_VMI_Connection_Timeout) > now_time)) {
3701     sched_yield ();
3702     status = VMI_Poll ();
3703     CMI_VMI_CHECK_SUCCESS (status, "VMI_Poll()");
3704
3705     gettimeofday (&tp, NULL);
3706     now_time = tp.tv_sec;
3707
3708     pending = FALSE;
3709     for (i = 0; i < _Cmi_numpes; i++) {
3710       pending = pending || ((&CMI_VMI_Processes[i])->connection_state != CMI_VMI_CONNECTION_CONNECTED);
3711     }
3712
3713     /* Every 30 seconds, retry any outgoing connections that had errors. */
3714     if (pending && ((now_time - start_time) % 30 == 0)) {
3715       for (i = 0; i < _Cmi_mype; i++) {
3716         if ((&CMI_VMI_Processes[i])->connection_state == CMI_VMI_CONNECTION_ERROR) {
3717           sprintf (remote_key, "%s:%u\0", CMI_VMI_Program_Key, (&CMI_VMI_Processes[i])->rank);
3718           CMI_VMI_Open_Connection (i, remote_key, connect_message_buffer);
3719         }
3720       }
3721     }
3722   }
3723
3724   /* Free memory. */
3725   free (remote_key);
3726   VMI_Buffer_Deallocate (connect_message_buffer);
3727
3728   /* Verify that there were no connection problems. */
3729   if (pending) {
3730     DEBUG_PRINT ("There were connection errors for process %d.\n", _Cmi_mype);
3731     return (-1);
3732   }
3733
3734   DEBUG_PRINT ("All connections are open for process %d.\n", _Cmi_mype);
3735
3736   return (0);
3737 }
3738
3739
3740
3741 /**************************************************************************
3742 **
3743 */
3744 int CMI_VMI_Open_Connection (int remote_rank, char *remote_key, PVMI_BUFFER connect_message_buffer)
3745 {