netlrts: remove AMMASSO related
[charm.git] / src / arch / net / machine-dgram.c
1 /** @file
2  * Datagram implementation of Converse NET version
3  * @ingroup NET
4  * converse basic message header:<br>
5  * d0 d1 d2 d3:  DgramHeader<br>
6  * d4 d5:        msg length (32-bit number)<br>
7  * hdl:          handler<br>
8  * xhdl:         extended handler<br>
9
10   moved from machine.c by 
11   Orion Sky Lawlor, olawlor@acm.org, 7/25/2001
12 */
13
14 /**
15  * @addtogroup NET
16  * @{
17  */
18
19
20 #if CMK_USE_IBVERBS | CMK_USE_IBUD
21 #include <infiniband/verbs.h>
22 #endif
23
24 #define DGRAM_HEADER_SIZE 8
25
26 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
27 #define CmiMsgHeaderGetLength(msg)      (((int*)(msg))[2])
28 #define CmiMsgNext(msg) (*((void**)(msg)))
29
30 #define DGRAM_ROOTPE_MASK   (0xFFFFu)
31 #define DGRAM_SRCPE_MASK    (0xFFFF)
32 #define DGRAM_MAGIC_MASK    (0xFF)
33 #define DGRAM_SEQNO_MASK    (0xFFFFu)
34
35 #if CMK_NODE_QUEUE_AVAILABLE
36 #define DGRAM_NODEBROADCAST (0xFA)
37 //#define DGRAM_NODEMESSAGE   (0xFB)
38 #endif
39 #define DGRAM_DSTRANK_MAX   (0xFC)
40 #define DGRAM_SIMPLEKILL    (0xFD)
41 #define DGRAM_BROADCAST     (0xFE)
42 #define DGRAM_ACKNOWLEDGE   (0xFF)
43
44 /* DgramHeader overlays the first 4 fields of the converse CMK_MSG_HEADER_BASIC,
45    defined in conv-common.h.  As such, its size and alignment are critical. */
46 typedef struct {
47         unsigned int seqno:16;  /* seq number in send window */
48         unsigned int srcpe:16;  /* CmiMyPe of the sender */
49         unsigned int dstrank:8; /* rank of destination processor */
50         unsigned int magic:8;   /* Low 8 bits of charmrun PID */
51         unsigned int rootpe:16; /* broadcast root processor */
52 } DgramHeader;
53
54
55 /* the window size needs to be Cmi_window_size + sizeof(unsigned int) bytes) */
56 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
57
58 extern unsigned char computeCheckSum(unsigned char *data, int len);
59
60 #define DgramHeaderMake(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
61    DgramHeader *header = (DgramHeader *)(ptr);  \
62    header->seqno = seqno_; \
63    header->srcpe = srcpe_; \
64    header->dstrank = dstrank_; \
65    header->magic = magic_ & DGRAM_MAGIC_MASK; \
66    header->rootpe = root_; \
67 }
68
69 #define DgramHeaderBreak(ptr, dstrank_, srcpe_, magic_, seqno_, root_) { \
70    DgramHeader *header = (DgramHeader *)(ptr);  \
71    seqno_ = header->seqno; \
72    srcpe_ = header->srcpe; \
73    dstrank_ = header->dstrank; \
74    magic_ = header->magic; \
75    root_ = header->rootpe; \
76 }
77
78 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
79 static void randomCorrupt(char *data, int len)
80 {
81   if (0==(rand()%CMK_RANDOMLY_CORRUPT_MESSAGES))
82   { /* insert one random bit flip into this message: */
83     int badByte=rand()%len;
84     int badBit=rand()%8;
85     data[badByte]^=(1<<badBit);
86   } 
87 }
88 #endif
89
90 #define PE_BROADCAST_OTHERS (-101)
91 #define PE_BROADCAST_ALL    (-102)
92
93 #if CMK_NODE_QUEUE_AVAILABLE
94 #define NODE_BROADCAST_OTHERS (-201)
95 #define NODE_BROADCAST_ALL    (-202)
96 #endif
97
98 /********* Startup and Command-line args ********/
99 static int    Cmi_max_dgram_size;
100 static int    Cmi_os_buffer_size;
101 static int    Cmi_window_size;
102 static int    Cmi_half_window;
103 static double Cmi_delay_retransmit;
104 static double Cmi_ack_delay;
105 static int    Cmi_dgram_max_data;
106 static int    Cmi_comm_periodic_delay;
107 static int    Cmi_comm_clock_delay;
108 static int writeableAcks,writeableDgrams;/*Write-queue counts (to know when to sleep)*/
109
110 static void setspeed_atm()
111 {
112   Cmi_max_dgram_size   = 2048;
113   Cmi_os_buffer_size   = 50000;
114   Cmi_window_size      = 16;       /*20;*/
115   Cmi_delay_retransmit = 0.0150;
116   Cmi_ack_delay        = 0.0035;
117 }
118
119 static void setspeed_eth()
120 {
121   Cmi_max_dgram_size   = 1400;
122   Cmi_window_size      = 32;        /*40*/
123   Cmi_os_buffer_size   = Cmi_window_size*Cmi_max_dgram_size;
124   Cmi_delay_retransmit = 0.0400;
125   Cmi_ack_delay        = 0.0050;
126 }
127
128 static void setspeed_gigabit()
129 {
130   /* for gigabit net */
131   Cmi_max_dgram_size   = 9000;
132   Cmi_window_size      = 8;
133   Cmi_os_buffer_size   = 200000;
134   Cmi_delay_retransmit = 0.020;
135   Cmi_ack_delay        = 0.018;
136 }
137
138 static void extract_args(char **argv)
139 {
140   int ms;
141   setspeed_gigabit();
142   if (CmiGetArgFlagDesc(argv,"+atm","Tune for a low-latency ATM network"))
143     setspeed_atm();
144   if (CmiGetArgFlagDesc(argv,"+eth","Tune for an ethernet network"))
145     setspeed_eth();
146   if (CmiGetArgFlagDesc(argv,"+giga","Tune for a gigabit network"))
147     setspeed_gigabit();
148   CmiGetArgIntDesc(argv,"+max_dgram_size",&Cmi_max_dgram_size,"Size of each UDP packet");
149   CmiGetArgIntDesc(argv,"+window_size",&Cmi_window_size,"Number of unacknowledged packets");
150   /* must divide for window protocol to work */
151   if ( (DGRAM_SEQNO_MASK+1)%Cmi_window_size != 0)
152     CmiAbort("Invalid window size!");
153   CmiGetArgIntDesc(argv,"+os_buffer_size",&Cmi_os_buffer_size, "UDP socket's SO_RCVBUF/SO_SNDBUF");
154   if (CmiGetArgIntDesc(argv,"+delay_retransmit",&ms, "Milliseconds to wait before retransmit"))
155           Cmi_delay_retransmit=0.001*ms;
156   if (CmiGetArgIntDesc(argv,"+ack_delay",&ms, "Milliseconds to wait before ack'ing"))
157           Cmi_ack_delay=0.001*ms;
158   extract_common_args(argv);
159   Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
160   Cmi_half_window = Cmi_window_size >> 1;
161   if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
162     KillEveryone("Window size too big for OS buffer.");
163   Cmi_comm_periodic_delay=(int)(1000*Cmi_delay_retransmit);
164   if (Cmi_comm_periodic_delay>60) Cmi_comm_periodic_delay=60;
165   Cmi_comm_clock_delay=(int)(1000*Cmi_ack_delay);
166   if (sizeof(DgramHeader)!=DGRAM_HEADER_SIZE) {
167     CmiAbort("DatagramHeader in machine-dgram.c is the wrong size!\n");
168   }
169 }
170
171 /* Compare seqnos using modular arithmetic-- currently unused
172 static int seqno_in_window(unsigned int seqno,unsigned int winStart)
173 {
174   return ((DGRAM_SEQNO_MASK&(seqno-winStart)) < Cmi_window_size);
175 }
176 static int seqno_lt(unsigned int seqA,unsigned int seqB)
177 {
178   unsigned int del=seqB-seqA;
179   return (del>0u) && (del<(DGRAM_SEQNO_MASK/2));
180 }
181 static int seqno_le(unsigned int seqA,unsigned int seqB)
182 {
183   unsigned int del=seqB-seqA;
184   return (del>=0u) && (del<(DGRAM_SEQNO_MASK/2));
185 }
186 */
187
188
189 /*****************************************************************************
190  *
191  * Communication Structures
192  *
193  *****************************************************************************/
194
195 typedef struct OutgoingMsgStruct
196 {
197   struct OutgoingMsgStruct *next;
198   int   src, dst;
199   int   size;
200   char *data;
201   int   refcount;
202   int   freemode;
203 }
204 *OutgoingMsg;
205
206 typedef struct ExplicitDgramStruct
207 {
208   struct ExplicitDgramStruct *next;
209   int  srcpe, rank, seqno, broot;
210   unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */
211   double data[1];
212 }
213 *ExplicitDgram;
214
215 typedef struct ImplicitDgramStruct
216 {
217   struct ImplicitDgramStruct *next;
218   struct OtherNodeStruct *dest;
219   int srcpe, rank, seqno, broot;
220   char  *dataptr;
221   int    datalen;
222   OutgoingMsg ogm;
223 }
224 *ImplicitDgram;
225
226 struct PendingMsgStruct;
227
228
229 #if CMK_USE_IBUD
230 struct infiOtherNodeData;
231 struct infiOtherNodeData *initinfiData(int node,int lid,int qpn,int psn);
232 #endif
233 #if CMK_USE_IBVERBS
234 struct infiOtherNodeData;
235 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]);
236 void    infiPostInitialRecvs();
237 #endif
238
239 typedef struct FutureMessageStruct {
240   char *msg;
241   int len;
242 } *FutureMessage;
243
244 typedef struct OtherNodeStruct
245 {
246   int nodestart, nodesize;
247   skt_ip_t IP;
248   unsigned int mach_id;
249   unsigned int dataport;
250   struct sockaddr_in addr;
251 #if CMK_USE_TCP 
252   SOCKET        sock;           /* for TCP */
253 #endif
254 #if CMK_USE_MX
255   CmiUInt8 nic_id;
256   mx_endpoint_addr_t       endpoint_addr;
257   CdsFifo                  futureMsgs;   /* out-of-order */
258 #endif
259
260   unsigned int             send_last;    /* seqno of last dgram sent */
261   ImplicitDgram           *send_window;  /* datagrams sent, not acked */
262   ImplicitDgram            send_queue_h; /* head of send queue */
263   ImplicitDgram            send_queue_t; /* tail of send queue */
264   unsigned int             send_next;    /* next seqno to go into queue */
265   unsigned int             send_good;    /* last acknowledged seqno */
266   double                   send_primer;  /* time to send retransmit */
267   unsigned int             send_ack_seqno; /* next ack seqno to send */
268   int                      retransmit_leash; /*Maximum number of packets to retransmit*/
269 #if CMK_USE_GM
270   struct PendingMsgStruct *sendhead, *sendtail;  /* gm send queue */
271   int                      disable;
272   int                      gm_pending;
273 #endif
274
275 #if CMK_USE_IBVERBS | CMK_USE_IBUD
276         struct infiOtherNodeData *infiData;
277 #endif
278
279   int                      asm_rank;
280   int                      asm_total;
281   int                      asm_fill;
282   char                    *asm_msg;
283   
284   int                      recv_ack_cnt; /* number of unacked dgrams */
285   double                   recv_ack_time;/* time when ack should be sent */
286   unsigned int             recv_expect;  /* next dgram to expect */
287   ExplicitDgram           *recv_window;  /* Packets received, not integrated */
288   int                      recv_winsz;   /* Number of packets in recv window */
289   unsigned int             recv_next;    /* Seqno of first missing packet */
290   unsigned int             recv_ack_seqno; /* last ack seqno received */
291
292   unsigned int             stat_total_intr; /* Total Number of Interrupts */
293   unsigned int             stat_proc_intr;  /* Processed Interrupts */
294   unsigned int             stat_send_pkt;   /* number of packets sent */
295   unsigned int             stat_resend_pkt; /* number of packets resent */
296   unsigned int             stat_send_ack;   /* number of acks sent */
297   unsigned int             stat_recv_pkt;   /* number of packets received */
298   unsigned int             stat_recv_ack;   /* number of acks received */
299   unsigned int             stat_ack_pkts;   /* packets acked */
300   unsigned int             stat_consec_resend; /*Packets retransmitted since last ack*/ 
301
302   unsigned int sent_msgs;
303   unsigned int recd_msgs;
304   unsigned int sent_bytes;
305   unsigned int recd_bytes;
306 }
307 *OtherNode;
308
309 static void OtherNode_init(OtherNode node)
310 {
311     int i;
312     node->send_primer = 1.0e30; /*Don't retransmit until needed*/
313     node->retransmit_leash = 1; /*Start with short leash*/
314     node->send_last=0;
315     node->send_window =
316       (ImplicitDgram*)malloc(Cmi_window_size*sizeof(ImplicitDgram));
317     for (i=0;i<Cmi_window_size;i++) node->send_window[i]=NULL;
318     node->send_queue_h=node->send_queue_t=NULL;
319     node->send_next=0;
320     node->send_good=(unsigned int)(-1);
321     node->send_ack_seqno=0;
322 #if CMK_USE_GM
323     node->sendhead = node->sendtail = NULL;
324     node->disable = 0;
325     node->gm_pending = 0;
326 #endif
327 #if CMK_USE_MX
328     node->futureMsgs = CdsFifo_Create();
329 #endif
330
331     /*
332     TODO: The initial values of the Ammasso related members will be set by the machine layer
333           as the QPs are being created (along with any initial values).  After all the details
334           of the layer are figured out, put some defaults here just so they are initialized to
335           known values.  (Though, it should not be a problem that they are not initialized here yet.)
336     */
337
338     node->asm_rank=0;
339     node->asm_total=0;
340     node->asm_fill=0;
341     node->asm_msg=0;
342     
343     node->recv_ack_cnt=0;
344     node->recv_ack_time=1.0e30;
345     node->recv_ack_seqno=0;
346     node->recv_expect=0;
347     node->recv_window =
348       (ExplicitDgram*)malloc(Cmi_window_size*sizeof(ExplicitDgram));
349     for (i=0;i<Cmi_window_size;i++) node->recv_window[i]=NULL;    
350     node->recv_winsz=0;
351     node->recv_next=0;
352
353     node->stat_total_intr=0;
354     node->stat_proc_intr=0;
355     node->stat_send_pkt=0;
356     node->stat_resend_pkt=0;
357     node->stat_send_ack=0; 
358     node->stat_recv_pkt=0;      
359     node->stat_recv_ack=0;        
360     node->stat_ack_pkts=0;
361
362     node->sent_msgs = 0;
363     node->recd_msgs = 0;
364     node->sent_bytes = 0;
365     node->recd_bytes = 0;
366 }
367
368 static OtherNode *nodes_by_pe;  /* OtherNodes indexed by processor number */
369 static OtherNode  nodes;        /* Indexed only by ``node number'' */
370
371 #ifdef CMK_USE_SPECIAL_MESSAGE_QUEUE_CHECK
372 /** Return 1 if our outgoing message queue 
373    for this node is longer than this many bytes. */
374 int CmiLongSendQueue(int forNode,int longerThan) {
375         int ret=0;
376         ImplicitDgram dg;
377         CmiCommLock();
378         dg=nodes[forNode].send_queue_h;
379         while (longerThan>0 && dg) {
380                 longerThan-=dg->datalen;
381                 dg=dg->next;
382         }
383         CmiCommUnlock();
384         return ret;
385 }
386 #endif
387
388 extern void CmiGmConvertMachineID(unsigned int *mach_id);
389 extern void CmiAmmassoNodeAddressesStoreHandler(int pe, struct sockaddr_in *addr, int port);
390
391 /* initnode node table reply format:
392  +------------------------------------------------------- 
393  | 4 bytes  |   Number of nodes n                       ^
394  |          |   (big-endian binary integer)       4+12*n bytes
395  +-------------------------------------------------     |
396  ^  |        (one entry for each node)            ^     |
397  |  | 4 bytes  |   Number of PEs for this node    |     |
398  n  | 4 bytes  |   IP address of this node   12*n bytes |
399  |  | 4 bytes  |   Data (UDP) port of this node   |     |
400  v  |          |   (big-endian binary integers)   v     v
401  ---+----------------------------------------------------
402 */
403 static void node_addresses_store(ChMessage *msg)
404 {
405   ChMessageInt_t *n32=(ChMessageInt_t *)msg->data;
406   ChNodeinfo *d=(ChNodeinfo *)(n32+1);
407   int nodestart;
408   int i,j,n;
409   MACHSTATE(1,"node_addresses_store {");        
410   _Cmi_numnodes=ChMessageInt(n32[0]);
411
412 #if CMK_USE_IBVERBS
413   ChInfiAddr *remoteInfiAddr = (ChInfiAddr *) (&msg->data[sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes]);
414   if (Cmi_charmrun_fd == -1) {
415     d = &((ChSingleNodeinfo*)n32)->info;
416   }
417   else if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes +sizeof(ChInfiAddr)*_Cmi_numnodes )
418          !=(unsigned int)msg->len)
419     {printf("Node table has inconsistent length!");machine_exit(1);}
420
421 #else
422
423   if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes)
424          !=(unsigned int)msg->len)
425     {printf("Node table has inconsistent length!");machine_exit(1);}
426 #endif /*CMK_USE_IBVERBS*/
427   nodes = (OtherNode)malloc(_Cmi_numnodes * sizeof(struct OtherNodeStruct));
428   nodestart=0;
429   for (i=0; i<_Cmi_numnodes; i++) {
430     nodes[i].nodestart = nodestart;
431     nodes[i].nodesize  = ChMessageInt(d[i].nPE);
432     MACHSTATE2(3,"node %d nodesize %d",i,nodes[i].nodesize);
433     nodes[i].mach_id = ChMessageInt(d[i].mach_id);
434 #if CMK_USE_MX
435     nodes[i].nic_id = ChMessageLong(d[i].nic_id);
436 #endif
437
438 #if CMK_USE_IBUD
439     nodes[i].infiData=initinfiData(i,ChMessageInt(d[i].qp.lid),ChMessageInt(d[i].qp.qpn),ChMessageInt(d[i].qp.psn));
440 #endif
441
442 #if CMK_USE_GM
443     CmiGmConvertMachineID(& nodes[i].mach_id);
444 #endif
445     nodes[i].IP=d[i].IP;
446     if (i==_Cmi_mynode) {
447       Cmi_nodestart=nodes[i].nodestart;
448       _Cmi_mynodesize=nodes[i].nodesize;
449       Cmi_self_IP=nodes[i].IP;
450     }
451
452 #if CMK_USE_IBVERBS 
453     if(i != _Cmi_mynode){
454         int addr[3];
455         addr[0] =ChMessageInt(remoteInfiAddr[i].lid);
456         addr[1] =ChMessageInt(remoteInfiAddr[i].qpn);
457         addr[2] =ChMessageInt(remoteInfiAddr[i].psn);
458         nodes[i].infiData = initInfiOtherNodeData(i,addr);
459     }
460 #else
461     nodes[i].dataport = ChMessageInt(d[i].dataport);
462     nodes[i].addr = skt_build_addr(nodes[i].IP,nodes[i].dataport);
463 #endif
464
465 #if CMK_USE_TCP
466     nodes[i].sock = INVALID_SOCKET;
467 #endif
468     nodestart+=nodes[i].nodesize;
469
470   }
471   _Cmi_numpes=nodestart;
472   n = _Cmi_numpes;
473 #ifdef CMK_CPV_IS_SMP
474   n += _Cmi_numnodes;
475 #endif
476   nodes_by_pe = (OtherNode*)malloc(n * sizeof(OtherNode));
477   _MEMCHECK(nodes_by_pe);
478   for (i=0; i<_Cmi_numnodes; i++) {
479     OtherNode node = nodes + i;
480     OtherNode_init(node);
481     for (j=0; j<node->nodesize; j++)
482       nodes_by_pe[j + node->nodestart] = node;
483   }
484 #ifdef CMK_CPV_IS_SMP
485   /* index for communication threads */
486   for (i=_Cmi_numpes; i<_Cmi_numpes+_Cmi_numnodes; i++) {
487     OtherNode node = nodes + i-_Cmi_numpes;
488     nodes_by_pe[i] = node;
489   }
490 #endif
491 #if CMK_USE_IBVERBS
492   infiPostInitialRecvs();
493 #endif
494   MACHSTATE(1,"} node_addresses_store");
495 }
496
497 /**
498  * Printing Net Statistics -- milind
499  */
500 static char statstr[10000];
501
502 void printNetStatistics(void)
503 {
504   char tmpstr[1024];
505   OtherNode myNode;
506   int i;
507   unsigned int send_pkt=0, resend_pkt=0, recv_pkt=0, send_ack=0;
508   unsigned int recv_ack=0, ack_pkts=0;
509
510   myNode = nodes+CmiMyNode();
511   sprintf(tmpstr, "***********************************\n");
512   strcpy(statstr, tmpstr);
513   sprintf(tmpstr, "Net Statistics For Node %u\n", CmiMyNode());
514   strcat(statstr, tmpstr);
515   sprintf(tmpstr, "Interrupts: %u \tProcessed: %u\n",
516                   myNode->stat_total_intr, myNode->stat_proc_intr);
517   strcat(statstr, tmpstr);
518   sprintf(tmpstr, "Total Msgs Sent: %u \tTotal Bytes Sent: %u\n",
519                   myNode->sent_msgs, myNode->sent_bytes);
520   strcat(statstr, tmpstr);
521   sprintf(tmpstr, "Total Msgs Recv: %u \tTotal Bytes Recv: %u\n",
522                   myNode->recd_msgs, myNode->recd_bytes);
523   strcat(statstr, tmpstr);
524   sprintf(tmpstr, "***********************************\n");
525   strcat(statstr, tmpstr);
526   sprintf(tmpstr, "[Num]\tSENDTO\tRESEND\tRECV\tACKSTO\tACKSFRM\tPKTACK\n");
527   strcat(statstr,tmpstr);
528   sprintf(tmpstr, "=====\t======\t======\t====\t======\t=======\t======\n");
529   strcat(statstr,tmpstr);
530   for(i=0;i<CmiNumNodes();i++) {
531     OtherNode node = nodes+i;
532     sprintf(tmpstr, "[%u]\t%u\t%u\t%u\t%u\t%u\t%u\n",
533                      i, node->stat_send_pkt, node->stat_resend_pkt,
534                      node->stat_recv_pkt, node->stat_send_ack,
535                      node->stat_recv_ack, node->stat_ack_pkts);
536     strcat(statstr, tmpstr);
537     send_pkt += node->stat_send_pkt;
538     recv_pkt += node->stat_recv_pkt;
539     resend_pkt += node->stat_resend_pkt;
540     send_ack += node->stat_send_ack;
541     recv_ack += node->stat_recv_ack;
542     ack_pkts += node->stat_ack_pkts;
543   }
544   sprintf(tmpstr, "[TOTAL]\t%u\t%u\t%u\t%u\t%u\t%u\n",
545                      send_pkt, resend_pkt,
546                      recv_pkt, send_ack,
547                      recv_ack, ack_pkts);
548   strcat(statstr, tmpstr);
549   sprintf(tmpstr, "***********************************\n");
550   strcat(statstr, tmpstr);
551   CmiPrintf(statstr);
552 }
553
554
555 /************** free list management *****************/
556
557 static ExplicitDgram Cmi_freelist_explicit;
558 static ImplicitDgram Cmi_freelist_implicit;
559 /*static OutgoingMsg   Cmi_freelist_outgoing;*/
560
561 #define FreeImplicitDgram(dg) {\
562   ImplicitDgram d=(dg);\
563   d->next = Cmi_freelist_implicit;\
564   Cmi_freelist_implicit = d;\
565 }
566
567 #define MallocImplicitDgram(dg) {\
568   ImplicitDgram d = Cmi_freelist_implicit;\
569   if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
570              _MEMCHECK(d);\
571   } else Cmi_freelist_implicit = d->next;\
572   dg = d;\
573 }
574
575 #define FreeExplicitDgram(dg) {\
576   ExplicitDgram d=(dg);\
577   d->next = Cmi_freelist_explicit;\
578   Cmi_freelist_explicit = d;\
579 }
580
581 #define MallocExplicitDgram(dg) {\
582   ExplicitDgram d = Cmi_freelist_explicit;\
583   if (d==0) { d = ((ExplicitDgram)malloc \
584                    (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
585               _MEMCHECK(d);\
586   } else Cmi_freelist_explicit = d->next;\
587   dg = d;\
588 }
589
590 /* Careful with these next two, need concurrency control */
591
592 #define FreeOutgoingMsg(m) (free(m))
593 #define MallocOutgoingMsg(m)\
594     {(m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct))); _MEMCHECK(m);}
595
596 /****************************************************************************
597  *                                                                          
598  * CheckSocketsReady
599  *
600  * Checks both sockets to see which are readable and which are writeable.
601  * We check all these things at the same time since this can be done for
602  * free with ``select.'' The result is stored in global variables, since
603  * this is essentially global state information and several routines need it.
604  *
605  ***************************************************************************/
606
607 static int ctrlskt_ready_read;
608 static int dataskt_ready_read;
609 static int dataskt_ready_write;
610
611 /******************************************************************************
612  *
613  * Transmission Code
614  *
615  *****************************************************************************/
616
617 void GarbageCollectMsg(OutgoingMsg ogm)
618 {
619   MACHSTATE2(3,"GarbageCollectMsg called on ogm %p refcount %d",ogm,ogm->refcount);
620         if (ogm->refcount == 0) {
621     if (ogm->freemode == 'A') {
622       ogm->freemode = 'X';
623     } else {
624       if (ogm->freemode != 'G') CmiFree(ogm->data);
625       FreeOutgoingMsg(ogm);
626     }
627   }
628 }
629
630 void DiscardImplicitDgram(ImplicitDgram dg)
631 {
632   OutgoingMsg ogm;
633   ogm = dg->ogm;
634   ogm->refcount--;
635   GarbageCollectMsg(ogm);
636   FreeImplicitDgram(dg);
637 }
638
639 /*
640  Check the real-time clock and perform periodic tasks.
641  Must be called with comm. lock held.
642  */
643 static double Cmi_ack_last, Cmi_check_last;
644 static void CommunicationsClock(void)
645 {
646   MACHSTATE(1,"CommunicationsClock");
647   Cmi_clock = GetClock();
648   if (Cmi_clock > Cmi_ack_last + 0.5*Cmi_ack_delay) {
649     MACHSTATE(2,"CommunicationsClock timing out acks");    
650     Cmi_ack_last=Cmi_clock;
651     writeableAcks=1;
652     writeableDgrams=1;
653   }
654   
655   if (Cmi_clock > Cmi_check_last + Cmi_check_delay) {
656     MACHSTATE(4,"CommunicationsClock pinging charmrun");       
657     Cmi_check_last = Cmi_clock; 
658     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
659   }
660 }
661
662 #if CMK_SHARED_VARS_UNAVAILABLE
663 static void CommunicationsClockCaller(void *ignored)
664 {
665   CmiCommLock();
666   CommunicationsClock();
667   CmiCommUnlock();
668   CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);  
669 }
670
671 static void CommunicationPeriodic(void) 
672 { /*Poll on the communications server*/
673   CommunicationServerNet(0, COMM_SERVER_FROM_SMP);
674 }
675
676 static void CommunicationPeriodicCaller(void *ignored)
677 {
678   CommunicationPeriodic();
679   CcdCallFnAfter((CcdVoidFn)CommunicationPeriodicCaller,NULL,Cmi_comm_periodic_delay);
680 }
681 #endif
682
683 /* common hardware dependent API */
684 /*void EnqueueOutgoingDgram(OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot);*/
685 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy);
686
687 //void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int nodesend);
688 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int curcycle, int nodesend);
689
690 #if CMK_USE_GM
691
692 #include "machine-gm.c"
693
694 #elif CMK_USE_MX
695
696 #include "machine-mx.c"
697
698 #elif CMK_USE_TCP
699
700 #include "machine-tcp.c"
701
702 #elif CMK_USE_IBVERBS
703
704 #include "machine-ibverbs.c"
705
706 #elif CMK_USE_IBUD
707 #include "machine-ibud.c"
708
709 #else
710
711 #include "machine-eth.c"
712
713 #endif
714
715
716 /*@}*/