730d0fb9f8427736de65cb5ca5b008cb39d8a289
[charm.git] / src / arch / pamilrts / manytomany.c
1
2 #include "converse.h"
3 #include "cmidirectmanytomany.h"
4 #define MAX_CONN   8
5
6 #define M2M_PAMI_S8DISPATCH 13
7 #define M2M_PAMI_SDISPATCH  14
8 #define M2M_PAMI_DISPATCH   15
9
10 typedef struct _pami_m2mhdr {
11   int8_t    dstrank;
12   int8_t    connid;
13   int32_t   srcindex;
14 } PAMI_M2mHeader; 
15
16 typedef struct _pami_m2m_work {
17   pami_work_t    work;
18   int            start;
19   int            end;
20   void         * handle;
21   pami_context_t context;
22 } PAMI_M2mWork_t;
23
24 typedef struct _m2m_completionmsg {
25   char  hdr [CmiMsgHeaderSizeBytes];
26   void  *handle;
27   int    rank;
28 } M2mCompletionMsg;
29
30 #define MAX_NWORK 8
31
32 typedef struct _pami_cmidhandle {
33   int                   myrank;
34   unsigned              m2m_rcvcounter ;
35   unsigned              m2m_nzrcvranks;  
36   char                * m2m_rcvbuf     ;
37   unsigned            * m2m_rcvlens    ;
38   unsigned            * m2m_rdispls    ;
39
40   unsigned              m2m_nsndranks;
41   unsigned              m2m_srankIndex;               
42   char                * m2m_sndbuf     ;
43   unsigned            * m2m_sndlens    ;
44   unsigned            * m2m_sdispls    ;
45   unsigned              m2m_sndcounter ;
46   unsigned            * m2m_permutation;
47   unsigned            * m2m_lranks     ;
48   pami_endpoint_t     * m2m_node_eps;
49
50   PAMI_M2mWork_t        swork[MAX_NWORK];  
51   int                   n_work;
52
53   CmiDirectM2mHandler   m2m_rdone;
54   void                * m2m_rdonecontext;
55   PAMI_M2mHeader      * m2m_hdrs;
56   M2mCompletionMsg      cmsg;
57
58   unsigned              m2m_ntotalrcvranks;
59   unsigned              m2m_initialized;  
60   unsigned              m2m_rrankIndex; 
61   CmiDirectM2mHandler   m2m_sdone;
62   void                * m2m_sdonecontext;
63 } PAMICmiDirectM2mHandle;  
64
65 CpvDeclare(PAMICmiDirectM2mHandle*, _handle);
66 CpvDeclare(int, _completion_handler);
67
68 static void m2m_recv_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
69 {
70   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
71   //acquire lock if processed by many comm threads and contexts?
72   handle->m2m_rcvcounter ++;
73     
74   if (handle->m2m_rcvcounter == handle->m2m_nzrcvranks) {
75     //printf ("Calling manytomany rdone for handle %p on rank %d counter %d nexp %d\n", 
76     //    handle, CmiMyPe(),
77     //    handle->m2m_rcvcounter, handle->m2m_nzrcvranks);
78     handle->m2m_rcvcounter = 0;
79 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
80     //Called from comm thread
81     CmiSendPeer (handle->myrank, sizeof(M2mCompletionMsg), (char*)&handle->cmsg);
82 #else
83     if (handle->m2m_rdone)
84       handle->m2m_rdone(handle->m2m_rdonecontext);
85 #endif
86   }
87 }
88
89 static void m2m_send_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
90 {
91   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
92   //acquire lock if processed by many comm threads and contexts?
93   handle->m2m_sndcounter ++;
94   if (handle->m2m_sndcounter == handle->m2m_nsndranks) {
95     //if in comm thread send a converse message
96     //else
97     handle->m2m_sndcounter = 0;
98     if (handle->m2m_sdone)
99       handle->m2m_sdone(handle->m2m_sdonecontext); 
100   }
101 }
102
103 static void m2m_rdone_mainthread (void *m) {
104   M2mCompletionMsg *msg = (M2mCompletionMsg *) m;
105   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)msg->handle;
106   if (handle->m2m_rdone)
107     handle->m2m_rdone(handle->m2m_rdonecontext);
108 }
109
110 static void m2m_s8_dispatch (pami_context_t       context,  
111                              void               * clientdata,
112                              const void         * header_addr, 
113                              size_t               header_size, 
114                              const void         * pipe_addr,   
115                              size_t               pipe_size,   
116                              pami_endpoint_t      origin,
117                              pami_recv_t         * recv)       
118 {
119   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
120   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);  
121   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
122   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
123
124   //Copy 8 bytes
125   *(uint64_t *)buffer = *(uint64_t*)pipe_addr;
126   m2m_recv_done (context, handle, PAMI_SUCCESS);
127 }
128
129
130 static void m2m_spkt_dispatch (pami_context_t       context,  
131                               void               * clientdata,
132                               const void         * header_addr, 
133                               size_t               header_size, 
134                               const void         * pipe_addr,   
135                               size_t               pipe_size,   
136                               pami_endpoint_t      origin,
137                               pami_recv_t         * recv)       
138 {
139   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
140   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);   
141   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
142
143   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
144   memcpy (buffer, pipe_addr, pipe_size);
145   m2m_recv_done (context, handle, PAMI_SUCCESS);
146 }
147
148
149
150 static void m2m_pkt_dispatch (pami_context_t       context,  
151                               void               * clientdata,
152                               const void         * header_addr, 
153                               size_t               header_size, 
154                               const void         * pipe_addr,   
155                               size_t               pipe_size,   
156                               pami_endpoint_t      origin,
157                               pami_recv_t         * recv)       
158 {
159   PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
160
161   //CmiAssert (hdr->dstrank < CmiMyNodeSize());
162   //CmiAssert (hdr->connid  < MAX_CONN);
163
164   PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);
165   //CmiAssert (handlevec != NULL);
166   
167   //fprintf(stderr, "m2m_pkt_dispatch: mype %d connid %d dstrank %d handlevec %p\n",
168   //  CmiMyPe(), hdr->connid, hdr->dstrank, handlevec);
169   
170   PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
171
172   char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
173
174   if (recv) {
175     recv->local_fn = m2m_recv_done;
176     recv->cookie   = handle;
177     recv->type     = PAMI_TYPE_BYTE;
178     recv->addr     = buffer;
179     recv->offset   = 0;
180     recv->data_fn  = PAMI_DATA_COPY;
181   }
182   else {
183     memcpy (buffer, pipe_addr, pipe_size);
184     m2m_recv_done (context, handle, PAMI_SUCCESS);
185   }
186 }
187
188
189 void * CmiDirect_manytomany_allocate_handle () {  
190   if (!CpvInitialized(_handle))
191     CpvInitialize(PAMICmiDirectM2mHandle*, _handle);
192   if (!CpvInitialized(_completion_handler))
193     CpvInitialize(int, _completion_handler);  
194   ppc_msync();
195   
196   if (CpvAccess(_handle) == NULL) {
197     CpvAccess(_handle) = (PAMICmiDirectM2mHandle *)malloc (MAX_CONN *sizeof(PAMICmiDirectM2mHandle));
198     memset (CpvAccess(_handle),0,MAX_CONN*sizeof (PAMICmiDirectM2mHandle));
199     CpvAccess(_completion_handler) = CmiRegisterHandler(m2m_rdone_mainthread);
200   }
201   
202   //printf ("allocate_handle on rank %d %p\n", CmiMyPe(), CpvAccess(_handle));
203   return CpvAccess(_handle);
204 }
205
206
207 void   CmiDirect_manytomany_initialize_recvbase(void                 * h,
208                                                 unsigned               tag,
209                                                 CmiDirectM2mHandler    donecb,
210                                                 void                 * context,
211                                                 char                 * rcvbuf,
212                                                 unsigned               nranks,
213                                                 unsigned               myIdx )
214 {
215   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
216   //PAMICmiDirectM2mHandle *handle = &(CpvAccess(_handle)[tag]);
217
218   //printf ("manytomany recvbase on rank %d handle %p conn %d nranks %d\n", 
219   //  CmiMyPe(), handle, tag, nranks);
220
221   handle->myrank = CmiMyRank();
222   handle->cmsg.handle = handle;
223   CmiSetHandler (&handle->cmsg, CpvAccess(_completion_handler));
224
225   handle->m2m_initialized = 1;
226   assert ( tag < MAX_CONN  );
227   handle->m2m_rcvbuf   = rcvbuf;
228
229   handle->m2m_rdone        = donecb;
230   handle->m2m_rdonecontext = context;
231   handle->m2m_ntotalrcvranks    = nranks;
232   
233   //Receiver is not sender
234   //if (myIdx == (unsigned)-1) 
235   //(handle->m2m_ntotalrcvranks)++;
236     
237   handle->m2m_rcvlens   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
238   handle->m2m_rdispls   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
239   
240   assert (handle->m2m_rcvlens != NULL);
241   
242   memset (handle->m2m_rcvlens, 0, handle->m2m_ntotalrcvranks * sizeof(int));
243   memset (handle->m2m_rdispls, 0, handle->m2m_ntotalrcvranks * sizeof(int));
244   
245   //Receiver is not sender
246   //if (myIdx == (unsigned)-1) {
247   //Receiver doesnt send any data
248   //  myIdx =     handle->m2m_ntotalrcvranks - 1;
249   //CmiDirect_manytomany_initialize_recv (h, tag,  myIdx, 0, 0, CmiMyPe());
250   //}
251   handle->m2m_rrankIndex = myIdx;
252 }
253
254 void   CmiDirect_manytomany_initialize_recv ( void          * h,
255                                               unsigned        tag,
256                                               unsigned        idx,
257                                               unsigned        displ,
258                                               unsigned        bytes,
259                                               unsigned        rank )
260 {
261   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
262   assert ( tag < MAX_CONN  );
263   
264   if (handle->m2m_rcvlens[idx] == 0 && bytes > 0)
265     handle->m2m_nzrcvranks ++;
266
267   handle->m2m_rcvlens  [idx]   = bytes;
268   handle->m2m_rdispls  [idx]   = displ;
269 }
270
271
272 void   CmiDirect_manytomany_initialize_sendbase( void                 * h,
273                                                  unsigned               tag,
274                                                  CmiDirectM2mHandler    donecb,
275                                                  void                 * context,
276                                                  char                 * sndbuf,
277                                                  unsigned               nranks,
278                                                  unsigned               myIdx )
279 {
280   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
281   assert ( tag < MAX_CONN  );
282   handle->m2m_sndbuf       = sndbuf;
283   handle->m2m_sdone        = donecb;
284   handle->m2m_sdonecontext = context;
285   
286   handle->m2m_nsndranks    = nranks;
287   handle->m2m_srankIndex   = myIdx;  
288   handle->m2m_sndlens      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
289   handle->m2m_sdispls      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
290   handle->m2m_lranks       = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
291   handle->m2m_node_eps     = (pami_endpoint_t *) malloc (sizeof(pami_endpoint_t) * nranks);
292   handle->m2m_permutation  = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
293   handle->m2m_hdrs = (PAMI_M2mHeader *) malloc(sizeof(PAMI_M2mHeader) * nranks);
294
295   memset (handle->m2m_sndlens,    0, nranks * sizeof(int));
296   memset (handle->m2m_sdispls,    0, nranks * sizeof(int));
297   memset (handle->m2m_lranks,     0, nranks * sizeof(int));
298   memset (handle->m2m_node_eps,   0, nranks * sizeof(pami_endpoint_t));
299   memset (handle->m2m_permutation,0, nranks * sizeof(int));  
300
301 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
302   //we have a completion callback
303   if (handle->m2m_sdone != NULL) {
304     handle->swork[0].start = 0;
305     handle->swork[0].end   = handle->m2m_nsndranks;   
306     handle->swork[0].handle = handle;
307     handle->n_work = 1;
308
309     int context_id = MY_CONTEXT_ID();
310     context_id ++;
311     if (context_id >= cmi_pami_numcontexts)
312       context_id = 0;         
313     pami_context_t context = cmi_pami_contexts[context_id];    
314     handle->swork[0].context = context;
315   }
316   else {
317     int i = 0;
318     int context_id = MY_CONTEXT_ID();
319     pami_context_t context = NULL;
320     int start = 0, nranks = 0;
321     int ncontexts = cmi_pami_numcontexts;
322     if (ncontexts > MAX_NWORK)
323       ncontexts = MAX_NWORK;
324     if (ncontexts > handle->m2m_nsndranks)
325       ncontexts = handle->m2m_nsndranks;
326     handle->n_work = ncontexts;
327
328     nranks = handle->m2m_nsndranks / ncontexts;   
329     for (i = 0; i < ncontexts; ++i) {
330       handle->swork[i].start  = start;
331       handle->swork[i].end    = start + nranks;   
332       handle->swork[i].handle = handle;
333       start += nranks;
334       if (i == ncontexts - 1)
335         handle->swork[i].end  = handle->m2m_nsndranks;
336
337       context_id ++;
338       if (context_id >= cmi_pami_numcontexts)
339         context_id = 0;       
340       context = cmi_pami_contexts[context_id];
341       handle->swork[i].context = context;
342     }
343   }
344 #else
345   PAMIX_CONTEXT_LOCK(MY_CONTEXT());
346   handle->swork[0].start = 0;
347   handle->swork[0].end   = handle->m2m_nsndranks;   
348   handle->swork[0].handle = handle;
349   handle->n_work = 1;
350   handle->swork[0].context = MY_CONTEXT();
351   PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
352 #endif
353 }
354
355 #define PRIME_A  3010349UL
356 #define PRIME_B  3571UL
357
358 void   CmiDirect_manytomany_initialize_send ( void        * h,
359                                               unsigned      tag, 
360                                               unsigned      idx,
361                                               unsigned      displ,
362                                               unsigned      bytes,
363                                               unsigned      pe )
364 {
365   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
366   assert ( tag < MAX_CONN  );  
367   handle->m2m_sndlens    [idx]   = bytes;
368   handle->m2m_sdispls    [idx]   = displ;
369   
370   int lrank                      = CmiRankOf(pe);
371   handle->m2m_lranks     [idx]   = lrank;
372   
373   pami_endpoint_t target;
374   //get the destination context
375 #if CMK_PAMI_MULTI_CONTEXT 
376   size_t dst_context = (lrank>>LTPS);
377 #else
378   size_t dst_context = 0;
379 #endif
380   PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)CmiGetNodeGlobal(CmiNodeOf(pe),CmiMyPartition()), 
381                         dst_context, &target);
382   handle->m2m_node_eps   [idx]   = target;
383
384   //uint64_t p_rand = ((uint64_t)idx+1)*PRIME_A + PRIME_B*(CmiMyPe()+1);
385   unsigned seed = CmiMyPe()+1;
386   //start at a random location and move linearly from there
387   uint64_t p_rand = rand_r(&seed) + idx + 1;
388   //uint64_t p_rand = (uint64_t)idx + 1 + CmiMyPe();
389   //uint64_t p_rand   =  idx + 1;
390   handle->m2m_permutation[idx]   = (uint32_t)(p_rand%handle->m2m_nsndranks);
391   handle->m2m_hdrs[idx].connid   = tag;  
392   handle->m2m_hdrs[idx].dstrank  = lrank; 
393   handle->m2m_hdrs[idx].srcindex = handle->m2m_srankIndex;
394 }
395
396 static void  _internal_machine_send   ( pami_context_t      context, 
397                                         pami_endpoint_t     target_ep, 
398                                         int                 rank, 
399                                         int                 hdrsize,
400                                         char              * hdr,
401                                         int                 size, 
402                                         char              * msg,
403                                         pami_event_function cb_done,
404                                         void              * cd)
405 {
406   if (size < 128) {
407     pami_send_immediate_t parameters;
408     parameters.dispatch        = (size == 8)? M2M_PAMI_S8DISPATCH : M2M_PAMI_SDISPATCH;
409     //parameters.dispatch        = M2M_PAMI_SDISPATCH;
410     parameters.header.iov_base = hdr;
411     parameters.header.iov_len  = hdrsize;
412     parameters.data.iov_base   = msg;
413     parameters.data.iov_len    = size;
414     parameters.dest            = target_ep;
415     
416     PAMI_Send_immediate (context, &parameters);
417     //if (cb_done)
418     //cb_done (context, cd, PAMI_SUCCESS);
419   }
420   else {
421     pami_send_t parameters;
422     parameters.send.dispatch        = M2M_PAMI_DISPATCH;
423     parameters.send.header.iov_base = hdr;
424     parameters.send.header.iov_len  = hdrsize;
425     parameters.send.data.iov_base   = msg;
426     parameters.send.data.iov_len    = size;
427     parameters.events.cookie        = cd;
428     parameters.events.local_fn      = cb_done;
429     parameters.events.remote_fn     = NULL;
430     memset(&parameters.send.hints, 0, sizeof(parameters.send.hints));
431     parameters.send.dest            = target_ep;
432     
433     PAMI_Send (context, &parameters);
434   }
435 }
436
437 pami_result_t   _cmidirect_m2m_send_post_handler (pami_context_t     context,
438                                                   void             * cd) 
439 {
440   PAMI_M2mWork_t  *work = (PAMI_M2mWork_t *) cd;
441   PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)work->handle;
442   
443   int i = 0;
444   int pidx = 0;
445   char *buffer = NULL;
446   int bytes = NULL;
447
448   pami_event_function cb_done = m2m_send_done;
449   void *clientdata = handle;
450
451   if (handle->m2m_sdone == NULL) {
452     cb_done     = NULL;
453     clientdata  = NULL;
454   }
455
456   for (i = work->start; i < work->end; ++i) {
457     pidx   = handle->m2m_permutation[i];
458     buffer = handle->m2m_sndbuf + handle->m2m_sdispls[pidx];
459     bytes  = handle->m2m_sndlens[pidx];
460     
461     _internal_machine_send(context,
462                            handle->m2m_node_eps[pidx],
463                            handle->m2m_lranks[pidx],
464                            sizeof(PAMI_M2mHeader),
465                            (char*)&(handle->m2m_hdrs[pidx]),
466                            bytes, 
467                            buffer,
468                            cb_done,
469                            clientdata);
470   }  
471
472   return PAMI_SUCCESS;
473 }
474
475
476 void _cmidirect_m2m_initialize (pami_context_t *contexts, int nc) {
477   pami_dispatch_hint_t options = (pami_dispatch_hint_t) {0};
478   pami_dispatch_callback_function pfn;
479   int i = 0;
480   for (i = 0; i < nc; ++i) {
481     pfn.p2p = m2m_pkt_dispatch;
482     PAMI_Dispatch_set (contexts[i],
483                        M2M_PAMI_DISPATCH,
484                        pfn,
485                        NULL,
486                        options);
487
488     pfn.p2p = m2m_spkt_dispatch;
489     PAMI_Dispatch_set (contexts[i],
490                        M2M_PAMI_SDISPATCH,
491                        pfn,
492                        NULL,
493                        options);
494
495     pfn.p2p = m2m_s8_dispatch;
496     PAMI_Dispatch_set (contexts[i],
497                        M2M_PAMI_S8DISPATCH,
498                        pfn,
499                        NULL,
500                        options);
501   }
502 }
503
504
505 void   CmiDirect_manytomany_start ( void       * h,
506                                     unsigned     tag ) {
507   PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
508   assert (tag < MAX_CONN);
509
510   //printf ("Calling manytomany_start for conn %d handle %p on rank %d\n", tag, 
511   //  handle, CmiMyPe());
512   
513 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
514   //we have a completion callback
515   if (handle->m2m_sdone != NULL) {
516     PAMI_Context_post ( handle->swork[0].context, 
517                        &handle->swork[0].work, 
518                        _cmidirect_m2m_send_post_handler,
519                        &handle->swork[0]);
520   }
521   else {
522     int i;
523     for (i = 0; i < handle->n_work; ++i) {
524       PAMI_Context_post( handle->swork[i].context, 
525                         &handle->swork[i].work, 
526                         _cmidirect_m2m_send_post_handler,
527                         &handle->swork[i]);
528     }
529   }
530 #else
531   PAMIX_CONTEXT_LOCK(MY_CONTEXT());
532   _cmidirect_m2m_send_post_handler (MY_CONTEXT(), &handle->swork[0]);
533   PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
534 #endif
535 }