686b67d1ba4ad334e45fa7068708bd2b6fae7ce2
[charm.git] / src / arch / net / machine-dgram.c
1 /*
2   Datagram implementation of Converse NET version
3
4   moved from machine.c by 
5   Orion Sky Lawlor, olawlor@acm.org, 7/25/2001
6 */
7
8 /**
9   converse basic message header:
10   d0 d1 d2 d3:  DgramHeader
11   d4 d5:        msg length (32-bit number)
12   hdl:          handler
13   xhdl:         extended handler
14 */
15 #define DGRAM_HEADER_SIZE 8
16
17 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
18 #define CmiMsgHeaderGetLength(msg)      (((int*)(msg))[2])
19 #define CmiMsgNext(msg) (*((void**)(msg)))
20
21 #define DGRAM_SRCPE_MASK    (0xFFFF)
22 #define DGRAM_MAGIC_MASK    (0xFF)
23 #define DGRAM_SEQNO_MASK    (0xFFFFFFFFu)
24
25 #if CMK_NODE_QUEUE_AVAILABLE
26 #define DGRAM_NODEMESSAGE   (0xFB)
27 #endif
28 #define DGRAM_DSTRANK_MAX   (0xFC)
29 #define DGRAM_SIMPLEKILL    (0xFD)
30 #define DGRAM_BROADCAST     (0xFE)
31 #define DGRAM_ACKNOWLEDGE   (0xFF)
32
33 /* DgramHeader overlays the first 4 fields of the converse CMK_MSG_HEADER_BASIC,
34    defined in conv-common.h.  As such, its size and alignment are critical. */
35 typedef struct {
36         unsigned int seqno:32;  /* seq number in send window */
37         unsigned int srcpe:16;  /* CmiMyPe of the sender */
38         unsigned int dstrank:8; /* rank of destination processor */
39         unsigned int magic:8;   /* Low 8 bits of charmrun PID */
40 } DgramHeader;
41
42
43 /* the window size needs to be Cmi_window_size + sizeof(unsigned int) bytes) */
44 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
45
46 static unsigned char computeCheckSum(unsigned char *data, int len)
47 {
48   int i;
49   unsigned char ret = 0;
50   for (i=0; i<len; i++) ret ^= (unsigned char)data[i];
51   return ret;
52 }
53
54 #define DgramHeaderMake(ptr, dstrank_, srcpe_, magic_, seqno_) { \
55    DgramHeader *header = (DgramHeader *)(ptr);  \
56    header->seqno = seqno_; \
57    header->srcpe = srcpe_; \
58    header->dstrank = dstrank_; \
59    header->magic = magic_ & DGRAM_MAGIC_MASK; \
60 }
61
62 #define DgramHeaderBreak(ptr, dstrank_, srcpe_, magic_, seqno_) { \
63    DgramHeader *header = (DgramHeader *)(ptr);  \
64    seqno_ = header->seqno; \
65    srcpe_ = header->srcpe; \
66    dstrank_ = header->dstrank; \
67    magic_ = header->magic; \
68 }
69
70 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
71 static void randomCorrupt(char *data, int len)
72 {
73   if (0==(rand()%CMK_RANDOMLY_CORRUPT_MESSAGES))
74   { /* insert one random bit flip into this message: */
75     int badByte=rand()%len;
76     int badBit=rand()%8;
77     data[badByte]^=(1<<badBit);
78   } 
79 }
80 #endif
81
82 #define PE_BROADCAST_OTHERS (-101)
83 #define PE_BROADCAST_ALL    (-102)
84
85 #if CMK_NODE_QUEUE_AVAILABLE
86 #define NODE_BROADCAST_OTHERS (-201)
87 #define NODE_BROADCAST_ALL    (-202)
88 #endif
89
90 /********* Startup and Command-line args ********/
91 static int    Cmi_max_dgram_size;
92 static int    Cmi_os_buffer_size;
93 static int    Cmi_window_size;
94 static int    Cmi_half_window;
95 static double Cmi_delay_retransmit;
96 static double Cmi_ack_delay;
97 static int    Cmi_dgram_max_data;
98 static int    Cmi_comm_periodic_delay;
99 static int    Cmi_comm_clock_delay;
100 static int writeableAcks,writeableDgrams;/*Write-queue counts (to know when to sleep)*/
101
102 static void setspeed_atm()
103 {
104   Cmi_max_dgram_size   = 2048;
105   Cmi_os_buffer_size   = 50000;
106   Cmi_window_size      = 20;
107   Cmi_delay_retransmit = 0.0150;
108   Cmi_ack_delay        = 0.0035;
109 }
110
111 static void setspeed_eth()
112 {
113   Cmi_max_dgram_size   = 1400;
114   Cmi_window_size      = 40;
115   Cmi_os_buffer_size   = Cmi_window_size*Cmi_max_dgram_size;
116   Cmi_delay_retransmit = 0.0400;
117   Cmi_ack_delay        = 0.0050;
118 }
119
120 static void setspeed_gigabit()
121 {
122   /* for gigabit net */
123   Cmi_max_dgram_size   = 9000;
124   Cmi_window_size      = 8;
125   Cmi_os_buffer_size   = 200000;
126   Cmi_delay_retransmit = 0.020;
127   Cmi_ack_delay        = 0.018;
128 }
129
130 static void extract_args(char **argv)
131 {
132   int ms;
133   setspeed_eth();
134   if (CmiGetArgFlagDesc(argv,"+atm","Tune for a low-latency ATM network"))
135     setspeed_atm();
136   if (CmiGetArgFlagDesc(argv,"+eth","Tune for an ethernet network"))
137     setspeed_eth();
138   if (CmiGetArgFlagDesc(argv,"+giga","Tune for a gigabit network"))
139     setspeed_gigabit();
140   CmiGetArgIntDesc(argv,"+max_dgram_size",&Cmi_max_dgram_size,"Size of each UDP packet");
141   CmiGetArgIntDesc(argv,"+window_size",&Cmi_window_size,"Number of unacknowledged packets");
142   CmiGetArgIntDesc(argv,"+os_buffer_size",&Cmi_os_buffer_size, "UDP socket's SO_RCVBUF/SO_SNDBUF");
143   if (CmiGetArgIntDesc(argv,"+delay_retransmit",&ms, "Milliseconds to wait before retransmit"))
144           Cmi_delay_retransmit=0.001*ms;
145   if (CmiGetArgIntDesc(argv,"+ack_delay",&ms, "Milliseconds to wait before ack'ing"))
146           Cmi_delay_retransmit=0.001*ms;
147   extract_common_args(argv);
148   Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
149   Cmi_half_window = Cmi_window_size >> 1;
150   if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
151     KillEveryone("Window size too big for OS buffer.");
152   Cmi_comm_periodic_delay=(int)(1000*Cmi_delay_retransmit);
153   if (Cmi_comm_periodic_delay>60) Cmi_comm_periodic_delay=60;
154   Cmi_comm_clock_delay=(int)(1000*Cmi_ack_delay);
155   if (sizeof(DgramHeader)!=DGRAM_HEADER_SIZE) {
156     CmiAbort("DatagramHeader in machine-dgram.c is the wrong size!\n");
157   }
158 }
159
160 /* Compare seqnos using modular arithmetic-- currently unused
161 static int seqno_in_window(unsigned int seqno,unsigned int winStart)
162 {
163   return ((DGRAM_SEQNO_MASK&(seqno-winStart)) < Cmi_window_size);
164 }
165 static int seqno_lt(unsigned int seqA,unsigned int seqB)
166 {
167   unsigned int del=seqB-seqA;
168   return (del>0u) && (del<(DGRAM_SEQNO_MASK/2));
169 }
170 static int seqno_le(unsigned int seqA,unsigned int seqB)
171 {
172   unsigned int del=seqB-seqA;
173   return (del>=0u) && (del<(DGRAM_SEQNO_MASK/2));
174 }
175 */
176
177
178 /*****************************************************************************
179  *
180  * Communication Structures
181  *
182  *****************************************************************************/
183
184 typedef struct OutgoingMsgStruct
185 {
186   struct OutgoingMsgStruct *next;
187   int   src, dst;
188   int   size;
189   char *data;
190   int   refcount;
191   int   freemode;
192 }
193 *OutgoingMsg;
194
195 typedef struct ExplicitDgramStruct
196 {
197   struct ExplicitDgramStruct *next;
198   int  srcpe, rank, seqno;
199   unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */
200   double data[1];
201 }
202 *ExplicitDgram;
203
204 typedef struct ImplicitDgramStruct
205 {
206   struct ImplicitDgramStruct *next;
207   struct OtherNodeStruct *dest;
208   int srcpe, rank, seqno;
209   char  *dataptr;
210   int    datalen;
211   OutgoingMsg ogm;
212 }
213 *ImplicitDgram;
214
215 typedef struct OtherNodeStruct
216 {
217   int nodestart, nodesize;
218   skt_ip_t IP;
219   unsigned int mach_id;
220   unsigned int dataport;
221   struct sockaddr_in addr;
222 #if CMK_USE_TCP
223   SOCKET        sock;           /* for TCP */
224 #endif
225
226   unsigned int             send_last;    /* seqno of last dgram sent */
227   ImplicitDgram           *send_window;  /* datagrams sent, not acked */
228   ImplicitDgram            send_queue_h; /* head of send queue */
229   ImplicitDgram            send_queue_t; /* tail of send queue */
230   unsigned int             send_next;    /* next seqno to go into queue */
231   unsigned int             send_good;    /* last acknowledged seqno */
232   double                   send_primer;  /* time to send retransmit */
233   unsigned int             send_ack_seqno; /* next ack seqno to send */
234   int                      retransmit_leash; /*Maximum number of packets to retransmit*/
235
236   int                      asm_rank;
237   int                      asm_total;
238   int                      asm_fill;
239   char                    *asm_msg;
240   
241   int                      recv_ack_cnt; /* number of unacked dgrams */
242   double                   recv_ack_time;/* time when ack should be sent */
243   unsigned int             recv_expect;  /* next dgram to expect */
244   ExplicitDgram           *recv_window;  /* Packets received, not integrated */
245   int                      recv_winsz;   /* Number of packets in recv window */
246   unsigned int             recv_next;    /* Seqno of first missing packet */
247   unsigned int             recv_ack_seqno; /* last ack seqno received */
248
249   unsigned int             stat_total_intr; /* Total Number of Interrupts */
250   unsigned int             stat_proc_intr;  /* Processed Interrupts */
251   unsigned int             stat_send_pkt;   /* number of packets sent */
252   unsigned int             stat_resend_pkt; /* number of packets resent */
253   unsigned int             stat_send_ack;   /* number of acks sent */
254   unsigned int             stat_recv_pkt;   /* number of packets received */
255   unsigned int             stat_recv_ack;   /* number of acks received */
256   unsigned int             stat_ack_pkts;   /* packets acked */
257   unsigned int             stat_consec_resend; /*Packets retransmitted since last ack*/ 
258
259   int sent_msgs;
260   int recd_msgs;
261   int sent_bytes;
262   int recd_bytes;
263 }
264 *OtherNode;
265
266 static void OtherNode_init(OtherNode node)
267 {
268     int i;
269     node->send_primer = 1.0e30; /*Don't retransmit until needed*/
270     node->retransmit_leash = 1; /*Start with short leash*/
271     node->send_last=0;
272     node->send_window =
273       (ImplicitDgram*)malloc(Cmi_window_size*sizeof(ImplicitDgram));
274     for (i=0;i<Cmi_window_size;i++) node->send_window[i]=NULL;
275     node->send_queue_h=node->send_queue_t=NULL;
276     node->send_next=0;
277     node->send_good=(unsigned int)(-1);
278     node->send_ack_seqno=0;
279
280     node->asm_rank=0;
281     node->asm_total=0;
282     node->asm_fill=0;
283     node->asm_msg=0;
284     
285     node->recv_ack_cnt=0;
286     node->recv_ack_time=1.0e30;
287     node->recv_ack_seqno=0;
288     node->recv_expect=0;
289     node->recv_window =
290       (ExplicitDgram*)malloc(Cmi_window_size*sizeof(ExplicitDgram));
291     for (i=0;i<Cmi_window_size;i++) node->recv_window[i]=NULL;    
292     node->recv_winsz=0;
293     node->recv_next=0;
294
295     node->stat_total_intr=0;
296     node->stat_proc_intr=0;
297     node->stat_send_pkt=0;
298     node->stat_resend_pkt=0;
299     node->stat_send_ack=0; 
300     node->stat_recv_pkt=0;      
301     node->stat_recv_ack=0;        
302     node->stat_ack_pkts=0;
303
304     node->sent_msgs = 0;
305     node->recd_msgs = 0;
306     node->sent_bytes = 0;
307     node->recd_bytes = 0;
308 }
309
310 static OtherNode *nodes_by_pe;  /* OtherNodes indexed by processor number */
311 static OtherNode  nodes;        /* Indexed only by ``node number'' */
312
313 /* initnode node table reply format:
314  +------------------------------------------------------- 
315  | 4 bytes  |   Number of nodes n                       ^
316  |          |   (big-endian binary integer)       4+12*n bytes
317  +-------------------------------------------------     |
318  ^  |        (one entry for each node)            ^     |
319  |  | 4 bytes  |   Number of PEs for this node    |     |
320  n  | 4 bytes  |   IP address of this node   12*n bytes |
321  |  | 4 bytes  |   Data (UDP) port of this node   |     |
322  v  |          |   (big-endian binary integers)   v     v
323  ---+----------------------------------------------------
324 */
325 static void node_addresses_store(ChMessage *msg)
326 {
327   ChMessageInt_t *n32=(ChMessageInt_t *)msg->data;
328   ChNodeinfo *d=(ChNodeinfo *)(n32+1);
329   int nodestart;
330   int i,j,n;
331   _Cmi_numnodes=ChMessageInt(n32[0]);
332   
333   if ((sizeof(ChMessageInt_t)+sizeof(ChNodeinfo)*_Cmi_numnodes)
334          !=(unsigned int)msg->len)
335     {printf("Node table has inconsistent length!");machine_exit(1);}
336   nodes = (OtherNode)malloc(_Cmi_numnodes * sizeof(struct OtherNodeStruct));
337   nodestart=0;
338   for (i=0; i<_Cmi_numnodes; i++) {
339     nodes[i].nodestart = nodestart;
340     nodes[i].nodesize  = ChMessageInt(d[i].nPE);
341     nodes[i].mach_id = ChMessageInt(d[i].mach_id);
342     nodes[i].IP=d[i].IP;
343     if (i==_Cmi_mynode) {
344       Cmi_nodestart=nodes[i].nodestart;
345       _Cmi_mynodesize=nodes[i].nodesize;
346       Cmi_self_IP=nodes[i].IP;
347     }
348     nodes[i].dataport = ChMessageInt(d[i].dataport);
349     MACHSTATE4(2,"Nodetable[%d]={'pe' %d,IP=%08x,port=%d}",
350                i,nodes[i].nodestart,nodes[i].IP,nodes[i].dataport);
351
352     nodes[i].addr = skt_build_addr(nodes[i].IP,nodes[i].dataport);
353 #if CMK_USE_TCP
354     nodes[i].sock = INVALID_SOCKET;
355 #endif
356     nodestart+=nodes[i].nodesize;
357   }
358   _Cmi_numpes=nodestart;
359   n = _Cmi_numpes;
360 #ifdef CMK_CPV_IS_SMP
361   n += _Cmi_numnodes;
362 #endif
363   nodes_by_pe = (OtherNode*)malloc(n * sizeof(OtherNode));
364   _MEMCHECK(nodes_by_pe);
365   for (i=0; i<_Cmi_numnodes; i++) {
366     OtherNode node = nodes + i;
367     OtherNode_init(node);
368     for (j=0; j<node->nodesize; j++)
369       nodes_by_pe[j + node->nodestart] = node;
370   }
371 #ifdef CMK_CPV_IS_SMP
372   /* index for communication threads */
373   for (i=_Cmi_numpes; i<_Cmi_numpes+_Cmi_numnodes; i++) {
374     OtherNode node = nodes + i-_Cmi_numpes;
375     nodes_by_pe[i] = node;
376   }
377 #endif
378 }
379
380 /**
381  * Printing Net Statistics -- milind
382  */
383 static char statstr[10000];
384
385 void printNetStatistics(void)
386 {
387   char tmpstr[1024];
388   OtherNode myNode;
389   int i;
390   unsigned int send_pkt=0, resend_pkt=0, recv_pkt=0, send_ack=0;
391   unsigned int recv_ack=0, ack_pkts=0;
392
393   myNode = nodes+CmiMyNode();
394   sprintf(tmpstr, "***********************************\n");
395   strcpy(statstr, tmpstr);
396   sprintf(tmpstr, "Net Statistics For Node %u\n", CmiMyNode());
397   strcat(statstr, tmpstr);
398   sprintf(tmpstr, "Interrupts: %u \tProcessed: %u\n",
399                   myNode->stat_total_intr, myNode->stat_proc_intr);
400   strcat(statstr, tmpstr);
401   sprintf(tmpstr, "Total Msgs Sent: %u \tTotal Bytes Sent: %u\n",
402                   myNode->sent_msgs, myNode->sent_bytes);
403   strcat(statstr, tmpstr);
404   sprintf(tmpstr, "Total Msgs Recv: %u \tTotal Bytes Recv: %u\n",
405                   myNode->recd_msgs, myNode->recd_bytes);
406   strcat(statstr, tmpstr);
407   sprintf(tmpstr, "***********************************\n");
408   strcat(statstr, tmpstr);
409   sprintf(tmpstr, "[Num]\tSENDTO\tRESEND\tRECV\tACKSTO\tACKSFRM\tPKTACK\n");
410   strcat(statstr,tmpstr);
411   sprintf(tmpstr, "=====\t======\t======\t====\t======\t=======\t======\n");
412   strcat(statstr,tmpstr);
413   for(i=0;i<CmiNumNodes();i++) {
414     OtherNode node = nodes+i;
415     sprintf(tmpstr, "[%u]\t%u\t%u\t%u\t%u\t%u\t%u\n",
416                      i, node->stat_send_pkt, node->stat_resend_pkt,
417                      node->stat_recv_pkt, node->stat_send_ack,
418                      node->stat_recv_ack, node->stat_ack_pkts);
419     strcat(statstr, tmpstr);
420     send_pkt += node->stat_send_pkt;
421     recv_pkt += node->stat_recv_pkt;
422     resend_pkt += node->stat_resend_pkt;
423     send_ack += node->stat_send_ack;
424     recv_ack += node->stat_recv_ack;
425     ack_pkts += node->stat_ack_pkts;
426   }
427   sprintf(tmpstr, "[TOTAL]\t%u\t%u\t%u\t%u\t%u\t%u\n",
428                      send_pkt, resend_pkt,
429                      recv_pkt, send_ack,
430                      recv_ack, ack_pkts);
431   strcat(statstr, tmpstr);
432   sprintf(tmpstr, "***********************************\n");
433   strcat(statstr, tmpstr);
434   CmiPrintf(statstr);
435 }
436
437
438 /************** free list management *****************/
439
440 static ExplicitDgram Cmi_freelist_explicit;
441 static ImplicitDgram Cmi_freelist_implicit;
442 /*static OutgoingMsg   Cmi_freelist_outgoing;*/
443
444 #define FreeImplicitDgram(dg) {\
445   ImplicitDgram d=(dg);\
446   d->next = Cmi_freelist_implicit;\
447   Cmi_freelist_implicit = d;\
448 }
449
450 #define MallocImplicitDgram(dg) {\
451   ImplicitDgram d = Cmi_freelist_implicit;\
452   if (d==0) {d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
453              _MEMCHECK(d);\
454   } else Cmi_freelist_implicit = d->next;\
455   dg = d;\
456 }
457
458 #define FreeExplicitDgram(dg) {\
459   ExplicitDgram d=(dg);\
460   d->next = Cmi_freelist_explicit;\
461   Cmi_freelist_explicit = d;\
462 }
463
464 #define MallocExplicitDgram(dg) {\
465   ExplicitDgram d = Cmi_freelist_explicit;\
466   if (d==0) { d = ((ExplicitDgram)malloc \
467                    (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
468               _MEMCHECK(d);\
469   } else Cmi_freelist_explicit = d->next;\
470   dg = d;\
471 }
472
473 /* Careful with these next two, need concurrency control */
474
475 #define FreeOutgoingMsg(m) (free(m))
476 #define MallocOutgoingMsg(m)\
477     {(m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct))); _MEMCHECK(m);}
478
479 /****************************************************************************
480  *                                                                          
481  * CheckSocketsReady
482  *
483  * Checks both sockets to see which are readable and which are writeable.
484  * We check all these things at the same time since this can be done for
485  * free with ``select.'' The result is stored in global variables, since
486  * this is essentially global state information and several routines need it.
487  *
488  ***************************************************************************/
489
490 static int ctrlskt_ready_read;
491 static int dataskt_ready_read;
492 static int dataskt_ready_write;
493
494 /******************************************************************************
495  *
496  * Transmission Code
497  *
498  *****************************************************************************/
499
500 void GarbageCollectMsg(OutgoingMsg ogm)
501 {
502   if (ogm->refcount == 0) {
503     if (ogm->freemode == 'A') {
504       ogm->freemode = 'X';
505     } else {
506       CmiFree(ogm->data);
507       FreeOutgoingMsg(ogm);
508     }
509   }
510 }
511
512 void DiscardImplicitDgram(ImplicitDgram dg)
513 {
514   OutgoingMsg ogm;
515   ogm = dg->ogm;
516   ogm->refcount--;
517   GarbageCollectMsg(ogm);
518   FreeImplicitDgram(dg);
519 }
520
521 /*
522  Check the real-time clock and perform periodic tasks.
523  Must be called with comm. lock held.
524  */
525 static double Cmi_ack_last, Cmi_check_last;
526 static void CommunicationsClock(void)
527 {
528   MACHSTATE(1,"CommunicationsClock");
529   Cmi_clock = GetClock();
530   if (Cmi_clock > Cmi_ack_last + 0.5*Cmi_ack_delay) {
531     MACHSTATE(4,"CommunicationsClock timing out acks");    
532     Cmi_ack_last=Cmi_clock;
533     writeableAcks=1;
534     writeableDgrams=1;
535   }
536   
537   if (Cmi_clock > Cmi_check_last + Cmi_check_delay) {
538     MACHSTATE(4,"CommunicationsClock pinging charmrun");       
539     Cmi_check_last = Cmi_clock; 
540     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
541   }
542 }
543
544 #if CMK_SHARED_VARS_UNAVAILABLE
545 static void CommunicationsClockCaller(void *ignored)
546 {
547   CmiCommLock();
548   CommunicationsClock();
549   CmiCommUnlock();
550   CcdCallFnAfter(CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);  
551 }
552
553 static void CommunicationPeriodic(void) 
554 { /*Poll on the communications server*/
555   CommunicationServerThread(0);
556 }
557
558 static void CommunicationPeriodicCaller(void *ignored)
559 {
560   CommunicationPeriodic();
561   CcdCallFnAfter(CommunicationPeriodicCaller,NULL,Cmi_comm_periodic_delay);
562 }
563 #endif
564
565 #if CMK_USE_GM
566
567 #include "machine-gm.c"
568
569 #elif CMK_USE_TCP
570
571 #include "machine-tcp.c"
572
573 #else
574
575 #include "machine-eth.c"
576
577 #endif