add abort to avoid the SMP many-to-many hang without async
[charm.git] / src / arch / pami / manytomany.c
1
2 #include "converse.h"
3 #include "cmidirectmanytomany.h"
4 #define MAX_CONN   8
5
6 #define M2M_PAMI_S8DISPATCH 13
7 #define M2M_PAMI_SDISPATCH  14
8 #define M2M_PAMI_DISPATCH   15
9
10 typedef struct _pami_m2mhdr {
11   int8_t    dstrank;
12   int8_t    connid;
13   int32_t   srcindex;
14 } PAMI_M2mHeader; 
15
16 typedef struct _pami_m2m_work {
17   pami_work_t    work;
18   int            start;
19   int            end;
20   void         * handle;
21   pami_context_t context;
22 } PAMI_M2mWork_t;
23
24 typedef struct _m2m_completionmsg {
25   char  hdr [CmiMsgHeaderSizeBytes];
26   void  *handle;
27   int    rank;
28 } M2mCompletionMsg;
29
30 #define MAX_NWORK 8
31
32 typedef struct _pami_cmidhandle {
33   int                   myrank;
34   unsigned              m2m_rcvcounter ;
35   unsigned              m2m_nzrcvranks;  
36   char                * m2m_rcvbuf     ;
37   unsigned            * m2m_rcvlens    ;
38   unsigned            * m2m_rdispls    ;
39
40   unsigned              m2m_nsndranks;
41   unsigned              m2m_srankIndex;               
42   char                * m2m_sndbuf     ;
43   unsigned            * m2m_sndlens    ;
44   unsigned            * m2m_sdispls    ;
45   unsigned              m2m_sndcounter ;
46   unsigned            * m2m_permutation;
47   unsigned            * m2m_lranks     ;
48   pami_endpoint_t     * m2m_node_eps;
49
50   PAMI_M2mWork_t        swork[MAX_NWORK];  
51   int                   n_work;
52
53   CmiDirectM2mHandler   m2m_rdone;
54   void                * m2m_rdonecontext;
55   PAMI_M2mHeader      * m2m_hdrs;
56   M2mCompletionMsg      cmsg;
57
58   unsigned              m2m_ntotalrcvranks;
59   unsigned              m2m_initialized;  
60   unsigned              m2m_rrankIndex; 
61   CmiDirectM2mHandler   m2m_sdone;
62   void                * m2m_sdonecontext;
63 } PAMICmiDirectM2mHandle;  
64
65 CpvDeclare(PAMICmiDirectM2mHandle*, _handle);
66 CpvDeclare(int, _completion_handler);
67
68 static void m2m_recv_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
69 {
70   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
71   //acquire lock if processed by many comm threads and contexts?
72   handle->m2m_rcvcounter ++;
73     
74   if (handle->m2m_rcvcounter == handle->m2m_nzrcvranks) {
75     //printf ("Calling manytomany rdone for handle %p on rank %d counter %d nexp %d\n", 
76     //    handle, CmiMyPe(),
77     //    handle->m2m_rcvcounter, handle->m2m_nzrcvranks);
78     handle->m2m_rcvcounter = 0;
79 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
80     //Called from comm thread
81     CmiSendPeer (handle->myrank, sizeof(M2mCompletionMsg), (char*)&handle->cmsg);
82 #else
83     if (handle->m2m_rdone)
84       handle->m2m_rdone(handle->m2m_rdonecontext);
85 #endif
86   }
87 }
88
89 static void m2m_send_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
90 {
91   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
92   //acquire lock if processed by many comm threads and contexts?
93   handle->m2m_sndcounter ++;
94   if (handle->m2m_sndcounter == handle->m2m_nsndranks) {
95     //if in comm thread send a converse message
96     //else
97     handle->m2m_sndcounter = 0;
98     if (handle->m2m_sdone)
99       handle->m2m_sdone(handle->m2m_sdonecontext); 
100   }
101 }
102
103 static void m2m_rdone_mainthread (void *m) {
104   M2mCompletionMsg *msg = (M2mCompletionMsg *) m;
105   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)msg->handle;
106   if (handle->m2m_rdone)
107     handle->m2m_rdone(handle->m2m_rdonecontext);
108 }
109
110 static void m2m_s8_dispatch (pami_context_t       context,  
111                              void               * clientdata,
112                              const void         * header_addr, 
113                              size_t               header_size, 
114                              const void         * pipe_addr,   
115                              size_t               pipe_size,   
116                              pami_endpoint_t      origin,
117                              pami_recv_t         * recv)       
118 {
119   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
120   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);  
121   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
122   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
123
124   //Copy 8 bytes
125   *(uint64_t *)buffer = *(uint64_t*)pipe_addr;
126   m2m_recv_done (context, handle, PAMI_SUCCESS);
127 }
128
129
130 static void m2m_spkt_dispatch (pami_context_t       context,  
131                               void               * clientdata,
132                               const void         * header_addr, 
133                               size_t               header_size, 
134                               const void         * pipe_addr,   
135                               size_t               pipe_size,   
136                               pami_endpoint_t      origin,
137                               pami_recv_t         * recv)       
138 {
139   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
140   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);   
141   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
142
143   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
144   memcpy (buffer, pipe_addr, pipe_size);
145   m2m_recv_done (context, handle, PAMI_SUCCESS);
146 }
147
148
149
150 static void m2m_pkt_dispatch (pami_context_t       context,  
151                               void               * clientdata,
152                               const void         * header_addr, 
153                               size_t               header_size, 
154                               const void         * pipe_addr,   
155                               size_t               pipe_size,   
156                               pami_endpoint_t      origin,
157                               pami_recv_t         * recv)       
158 {
159   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
160
161   //CmiAssert (hdr->dstrank < CmiMyNodeSize());
162   //CmiAssert (hdr->connid  < MAX_CONN);
163
164   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);
165   //CmiAssert (handlevec != NULL);
166   
167   //fprintf(stderr, "m2m_pkt_dispatch: mype %d connid %d dstrank %d handlevec %p\n",
168   //  CmiMyPe(), hdr->connid, hdr->dstrank, handlevec);
169   
170   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
171
172   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
173
174   if (recv) {
175     recv->local_fn = m2m_recv_done;
176     recv->cookie   = handle;
177     recv->type     = PAMI_TYPE_BYTE;
178     recv->addr     = buffer;
179     recv->offset   = 0;
180     recv->data_fn  = PAMI_DATA_COPY;
181   }
182   else {
183     memcpy (buffer, pipe_addr, pipe_size);
184     m2m_recv_done (context, handle, PAMI_SUCCESS);
185   }
186 }
187
188
189 void * CmiDirect_manytomany_allocate_handle () {  
190 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
191     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
192 #else 
193   if (!CpvInitialized(_handle))
194     CpvInitialize(PAMICmiDirectM2mHandle*, _handle);
195   if (!CpvInitialized(_completion_handler))
196     CpvInitialize(int, _completion_handler);  
197   ppc_msync();
198   
199   if (CpvAccess(_handle) == NULL) {
200     CpvAccess(_handle) = (PAMICmiDirectM2mHandle *)malloc (MAX_CONN *sizeof(PAMICmiDirectM2mHandle));
201     memset (CpvAccess(_handle),0,MAX_CONN*sizeof (PAMICmiDirectM2mHandle));
202     CpvAccess(_completion_handler) = CmiRegisterHandler(m2m_rdone_mainthread);
203   }
204   
205   //printf ("allocate_handle on rank %d %p\n", CmiMyPe(), CpvAccess(_handle));
206   return CpvAccess(_handle);
207 #endif
208 }
209
210
211 void   CmiDirect_manytomany_initialize_recvbase(void                 * h,
212                                                 unsigned               tag,
213                                                 CmiDirectM2mHandler    donecb,
214                                                 void                 * context,
215                                                 char                 * rcvbuf,
216                                                 unsigned               nranks,
217                                                 unsigned               myIdx )
218 {
219 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
220     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
221 #else 
222   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
223   //PAMICmiDirectM2mHandle *handle = &(CpvAccess(_handle)[tag]);
224
225   //printf ("manytomany recvbase on rank %d handle %p conn %d nranks %d\n", 
226   //  CmiMyPe(), handle, tag, nranks);
227
228   handle->myrank = CmiMyRank();
229   handle->cmsg.handle = handle;
230   CmiSetHandler (&handle->cmsg, CpvAccess(_completion_handler));
231
232   handle->m2m_initialized = 1;
233   assert ( tag < MAX_CONN  );
234   handle->m2m_rcvbuf   = rcvbuf;
235
236   handle->m2m_rdone        = donecb;
237   handle->m2m_rdonecontext = context;
238   handle->m2m_ntotalrcvranks    = nranks;
239   
240   //Receiver is not sender
241   //if (myIdx == (unsigned)-1) 
242   //(handle->m2m_ntotalrcvranks)++;
243     
244   handle->m2m_rcvlens   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
245   handle->m2m_rdispls   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
246   
247   assert (handle->m2m_rcvlens != NULL);
248   
249   memset (handle->m2m_rcvlens, 0, handle->m2m_ntotalrcvranks * sizeof(int));
250   memset (handle->m2m_rdispls, 0, handle->m2m_ntotalrcvranks * sizeof(int));
251   
252   //Receiver is not sender
253   //if (myIdx == (unsigned)-1) {
254   //Receiver doesnt send any data
255   //  myIdx =     handle->m2m_ntotalrcvranks - 1;
256   //CmiDirect_manytomany_initialize_recv (h, tag,  myIdx, 0, 0, CmiMyPe());
257   //}
258   handle->m2m_rrankIndex = myIdx;
259 #endif
260 }
261
262 void   CmiDirect_manytomany_initialize_recv ( void          * h,
263                                               unsigned        tag,
264                                               unsigned        idx,
265                                               unsigned        displ,
266                                               unsigned        bytes,
267                                               unsigned        rank )
268 {
269 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
270     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
271 #else 
272   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
273   assert ( tag < MAX_CONN  );
274   
275   if (handle->m2m_rcvlens[idx] == 0 && bytes > 0)
276     handle->m2m_nzrcvranks ++;
277
278   handle->m2m_rcvlens  [idx]   = bytes;
279   handle->m2m_rdispls  [idx]   = displ;
280 #endif
281 }
282
283
284 void   CmiDirect_manytomany_initialize_sendbase( void                 * h,
285                                                  unsigned               tag,
286                                                  CmiDirectM2mHandler    donecb,
287                                                  void                 * context,
288                                                  char                 * sndbuf,
289                                                  unsigned               nranks,
290                                                  unsigned               myIdx )
291 {
292 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
293     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
294 #else 
295   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
296   assert ( tag < MAX_CONN  );
297   handle->m2m_sndbuf       = sndbuf;
298   handle->m2m_sdone        = donecb;
299   handle->m2m_sdonecontext = context;
300   
301   handle->m2m_nsndranks    = nranks;
302   handle->m2m_srankIndex   = myIdx;  
303   handle->m2m_sndlens      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
304   handle->m2m_sdispls      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
305   handle->m2m_lranks       = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
306   handle->m2m_node_eps     = (pami_endpoint_t *) malloc (sizeof(pami_endpoint_t) * nranks);
307   handle->m2m_permutation  = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
308   handle->m2m_hdrs = (PAMI_M2mHeader *) malloc(sizeof(PAMI_M2mHeader) * nranks);
309
310   memset (handle->m2m_sndlens,    0, nranks * sizeof(int));
311   memset (handle->m2m_sdispls,    0, nranks * sizeof(int));
312   memset (handle->m2m_lranks,     0, nranks * sizeof(int));
313   memset (handle->m2m_node_eps,   0, nranks * sizeof(pami_endpoint_t));
314   memset (handle->m2m_permutation,0, nranks * sizeof(int));  
315
316 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
317   //we have a completion callback
318   if (handle->m2m_sdone != NULL) {
319     handle->swork[0].start = 0;
320     handle->swork[0].end   = handle->m2m_nsndranks;   
321     handle->swork[0].handle = handle;
322     handle->n_work = 1;
323
324     int context_id = MY_CONTEXT_ID();
325     context_id ++;
326     if (context_id >= cmi_pami_numcontexts)
327       context_id = 0;         
328     pami_context_t context = cmi_pami_contexts[context_id];    
329     handle->swork[0].context = context;
330   }
331   else {
332     int i = 0;
333     int context_id = MY_CONTEXT_ID();
334     pami_context_t context = NULL;
335     int start = 0, nranks = 0;
336     int ncontexts = cmi_pami_numcontexts;
337     if (ncontexts > MAX_NWORK)
338       ncontexts = MAX_NWORK;
339     if (ncontexts > handle->m2m_nsndranks)
340       ncontexts = handle->m2m_nsndranks;
341     handle->n_work = ncontexts;
342
343     nranks = handle->m2m_nsndranks / ncontexts;   
344     for (i = 0; i < ncontexts; ++i) {
345       handle->swork[i].start  = start;
346       handle->swork[i].end    = start + nranks;   
347       handle->swork[i].handle = handle;
348       start += nranks;
349       if (i == ncontexts - 1)
350         handle->swork[i].end  = handle->m2m_nsndranks;
351
352       context_id ++;
353       if (context_id >= cmi_pami_numcontexts)
354         context_id = 0;       
355       context = cmi_pami_contexts[context_id];
356       handle->swork[i].context = context;
357     }
358   }
359 #else
360   PAMIX_CONTEXT_LOCK(MY_CONTEXT());
361   handle->swork[0].start = 0;
362   handle->swork[0].end   = handle->m2m_nsndranks;   
363   handle->swork[0].handle = handle;
364   handle->n_work = 1;
365   handle->swork[0].context = MY_CONTEXT();
366   PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
367 #endif
368 #endif
369 }
370
371 #define PRIME_A  3010349UL
372 #define PRIME_B  3571UL
373
374 void   CmiDirect_manytomany_initialize_send ( void        * h,
375                                               unsigned      tag, 
376                                               unsigned      idx,
377                                               unsigned      displ,
378                                               unsigned      bytes,
379                                               unsigned      pe )
380 {
381 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
382     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
383 #else 
384   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
385   assert ( tag < MAX_CONN  );  
386   handle->m2m_sndlens    [idx]   = bytes;
387   handle->m2m_sdispls    [idx]   = displ;
388   
389   int lrank                      = CmiRankOf(pe);
390   handle->m2m_lranks     [idx]   = lrank;
391   
392   pami_endpoint_t target;
393   //get the destination context
394 #if CMK_PAMI_MULTI_CONTEXT 
395   size_t dst_context = (lrank>>LTPS);
396 #else
397   size_t dst_context = 0;
398 #endif
399   PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)CmiNodeOf(pe), 
400                         dst_context, &target);
401   handle->m2m_node_eps   [idx]   = target;
402
403   //uint64_t p_rand = ((uint64_t)idx+1)*PRIME_A + PRIME_B*(CmiMyPe()+1);
404   unsigned seed = CmiMyPe()+1;
405   //start at a random location and move linearly from there
406   uint64_t p_rand = rand_r(&seed) + idx + 1;
407   //uint64_t p_rand = (uint64_t)idx + 1 + CmiMyPe();
408   //uint64_t p_rand   =  idx + 1;
409   handle->m2m_permutation[idx]   = (uint32_t)(p_rand%handle->m2m_nsndranks);
410   handle->m2m_hdrs[idx].connid   = tag;  
411   handle->m2m_hdrs[idx].dstrank  = lrank; 
412   handle->m2m_hdrs[idx].srcindex = handle->m2m_srankIndex;
413 #endif
414 }
415
416 static void  _internal_machine_send   ( pami_context_t      context, 
417                                         pami_endpoint_t     target_ep, 
418                                         int                 rank, 
419                                         int                 hdrsize,
420                                         char              * hdr,
421                                         int                 size, 
422                                         char              * msg,
423                                         pami_event_function cb_done,
424                                         void              * cd)
425 {
426   if (size < 128) {
427     pami_send_immediate_t parameters;
428     parameters.dispatch        = (size == 8)? M2M_PAMI_S8DISPATCH : M2M_PAMI_SDISPATCH;
429     //parameters.dispatch        = M2M_PAMI_SDISPATCH;
430     parameters.header.iov_base = hdr;
431     parameters.header.iov_len  = hdrsize;
432     parameters.data.iov_base   = msg;
433     parameters.data.iov_len    = size;
434     parameters.dest            = target_ep;
435     
436     PAMI_Send_immediate (context, &parameters);
437     //if (cb_done)
438     //cb_done (context, cd, PAMI_SUCCESS);
439   }
440   else {
441     pami_send_t parameters;
442     parameters.send.dispatch        = M2M_PAMI_DISPATCH;
443     parameters.send.header.iov_base = hdr;
444     parameters.send.header.iov_len  = hdrsize;
445     parameters.send.data.iov_base   = msg;
446     parameters.send.data.iov_len    = size;
447     parameters.events.cookie        = cd;
448     parameters.events.local_fn      = cb_done;
449     parameters.events.remote_fn     = NULL;
450     memset(&parameters.send.hints, 0, sizeof(parameters.send.hints));
451     parameters.send.dest            = target_ep;
452     
453     PAMI_Send (context, &parameters);
454   }
455 }
456
457 pami_result_t   _cmidirect_m2m_send_post_handler (pami_context_t     context,
458                                                   void             * cd) 
459 {
460   PAMI_M2mWork_t  *work = (PAMI_M2mWork_t *) cd;
461   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)work->handle;
462   
463   int i = 0;
464   int pidx = 0;
465   char *buffer = NULL;
466   int bytes = NULL;
467
468   pami_event_function cb_done = m2m_send_done;
469   void *clientdata = handle;
470
471   if (handle->m2m_sdone == NULL) {
472     cb_done     = NULL;
473     clientdata  = NULL;
474   }
475
476   for (i = work->start; i < work->end; ++i) {
477     pidx   = handle->m2m_permutation[i];
478     buffer = handle->m2m_sndbuf + handle->m2m_sdispls[pidx];
479     bytes  = handle->m2m_sndlens[pidx];
480     
481     _internal_machine_send(context,
482                            handle->m2m_node_eps[pidx],
483                            handle->m2m_lranks[pidx],
484                            sizeof(PAMI_M2mHeader),
485                            (char*)&(handle->m2m_hdrs[pidx]),
486                            bytes, 
487                            buffer,
488                            cb_done,
489                            clientdata);
490   }  
491
492   return PAMI_SUCCESS;
493 }
494
495
496 void _cmidirect_m2m_initialize (pami_context_t *contexts, int nc) {
497   pami_dispatch_hint_t options = (pami_dispatch_hint_t) {0};
498   pami_dispatch_callback_function pfn;
499   int i = 0;
500   for (i = 0; i < nc; ++i) {
501     pfn.p2p = m2m_pkt_dispatch;
502     PAMI_Dispatch_set (contexts[i],
503                        M2M_PAMI_DISPATCH,
504                        pfn,
505                        NULL,
506                        options);
507
508     pfn.p2p = m2m_spkt_dispatch;
509     PAMI_Dispatch_set (contexts[i],
510                        M2M_PAMI_SDISPATCH,
511                        pfn,
512                        NULL,
513                        options);
514
515     pfn.p2p = m2m_s8_dispatch;
516     PAMI_Dispatch_set (contexts[i],
517                        M2M_PAMI_S8DISPATCH,
518                        pfn,
519                        NULL,
520                        options);
521   }
522 }
523
524
525 void   CmiDirect_manytomany_start ( void       * h,
526                                     unsigned     tag ) {
527 #if CMK_SMP && !CMK_ENABLE_ASYNC_PROGRESS
528     CmiAbort("!!!!!!!!!Please build Charm++ with async in order to use many-to-many interface\n");
529 #else 
530   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
531   assert (tag < MAX_CONN);
532
533   //printf ("Calling manytomany_start for conn %d handle %p on rank %d\n", tag, 
534   //  handle, CmiMyPe());
535   
536 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
537   //we have a completion callback
538   if (handle->m2m_sdone != NULL) {
539     PAMI_Context_post ( handle->swork[0].context, 
540                        &handle->swork[0].work, 
541                        _cmidirect_m2m_send_post_handler,
542                        &handle->swork[0]);
543   }
544   else {
545     int i;
546     for (i = 0; i < handle->n_work; ++i) {
547       PAMI_Context_post( handle->swork[i].context, 
548                         &handle->swork[i].work, 
549                         _cmidirect_m2m_send_post_handler,
550                         &handle->swork[i]);
551     }
552   }
553 #else
554   PAMIX_CONTEXT_LOCK(MY_CONTEXT());
555   _cmidirect_m2m_send_post_handler (MY_CONTEXT(), &handle->swork[0]);
556   PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
557 #endif
558 #endif
559 }