Bug fixed: the multi pool scheme is only used if CMK_SMP is on.
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * Ibverbs (infiniband)  implementation of Converse NET version
4  * @ingroup NET
5  * contains only Ibverbs specific code for:
6  * - CmiMachineInit()
7  * - CmiCommunicationInit()
8  * - CmiNotifyStillIdle()
9  * - DeliverViaNetwork()
10  * - CommunicationServer()
11  * - CmiMachineExit()
12
13   created by 
14         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
15 */
16
17 /**
18  * @addtogroup NET
19  * @{
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <malloc.h>
28 #include <getopt.h>
29 #include <time.h>
30
31 #include <infiniband/verbs.h>
32
33 enum ibv_mtu mtu = IBV_MTU_2048;
34 static int page_size;
35 static int mtu_size;
36 static int packetSize;
37 static int dataSize;
38 static int rdma;
39 static int rdmaThreshold;
40 static int firstBinSize;
41 static int blockAllocRatio;
42 static int blockThreshold;
43
44
45 static int maxRecvBuffers;
46 static int maxTokens;
47 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
48 static int sendPacketPoolSize; /*total number of send buffers created*/
49
50 static double _startTime=0;
51 static int regCount;
52
53 static int pktCount;
54 static int msgCount;
55 static int minTokensLeft;
56
57
58 static double regTime;
59
60 //TODO: erase this
61 static double processLockTime;
62 static double _auxTime;
63
64 static double processBufferedTime;
65 static int processBufferedCount;
66
67 #define CMK_IBVERBS_STATS 0
68 #define CMK_IBVERBS_TOKENS_FLOW 1
69 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
70 #define CMK_IBVERBS_DEBUG 0
71 #define CMI_DIRECT_DEBUG 0
72 #define WC_LIST_SIZE 32
73 /*#define WC_BUFFER_SIZE 100*/
74
75
76
77
78 #define INCTOKENS_FRACTION 0.04
79 #define INCTOKENS_INCREASE .50
80
81 // flag for using a pool for every thread in SMP mode
82 #if CMK_SMP
83 #define THREAD_MULTI_POOL 1
84 #endif
85
86 #if THREAD_MULTI_POOL 
87 #include "pcqueue.h"
88 PCQueue **queuePool;
89 void infi_CmiFreeDirect(void *ptr);
90 static inline void fillBufferPools();
91 #endif
92
93 #define INFIBARRIERPACKET 128
94
95 struct infiIncTokenAckPacket{
96         int a;
97 };
98
99 typedef struct {
100 char none;  
101 } CmiIdleState;
102
103 //TODO:ERASE this
104 static int TESTneighbor;
105 static int TESTfrees;
106
107
108 /********
109 ** The notify idle methods
110 ***/
111
112 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
113
114 static void CmiNotifyStillIdle(CmiIdleState *s);
115
116
117 static void CmiNotifyBeginIdle(CmiIdleState *s)
118 {
119   CmiNotifyStillIdle(s);
120 }
121
122 void CmiNotifyIdle(void) {
123   CmiNotifyStillIdle(NULL);
124 }
125
126 /***************
127 Data Structures 
128 ***********************/
129
130 /******
131         This is a header attached to the beginning of every infiniband packet
132 *******/
133 #define INFIPACKETCODE_DATA 1
134 #define INFIPACKETCODE_INCTOKENS 2
135 #define INFIRDMA_START 4
136 #define INFIRDMA_ACK 8
137 #define INFIDIRECT_REQUEST 16
138 #define INFIPACKETCODE_INCTOKENSACK 32
139 #define INFIDUMMYPACKET 64
140
141 struct infiPacketHeader{
142         char code;
143         int nodeNo;
144 #if     CMK_IBVERBS_DEBUG
145         int psn;
146 #endif  
147 };
148
149 /*
150         Types of rdma packets
151 */
152 #define INFI_MESG 1 
153 #define INFI_DIRECT 2
154
155 struct infiRdmaPacket{
156         int fromNodeNo;
157         int type;
158         struct ibv_mr key;
159         struct ibv_mr *keyPtr;
160         int remoteSize;
161         char *remoteBuf;
162         void *localBuffer;
163         OutgoingMsg ogm;
164         struct infiRdmaPacket *next,*prev;
165 };
166
167
168 /** Represents a buffer that is used to receive messages
169 */
170 #define BUFFER_RECV 1
171 #define BUFFER_RDMA 2
172 struct infiBuffer{
173         int type;
174         char *buf;
175         int size;
176         struct ibv_mr *key;
177 };
178
179
180
181 /** At the moment it is a simple pool with just a list of buffers
182         * TODO; extend it to make it an element in a linklist of pools
183 */
184 struct infiBufferPool{
185         int numBuffers;
186         struct infiBuffer *buffers;
187         struct infiBufferPool *next;
188 };
189
190 /*****
191         It is the structure for the send buffers that are used
192         to send messages to other nodes
193 ********/
194
195
196 typedef struct infiPacketStruct {       
197         char *buf;
198         int size;
199         struct infiPacketHeader header;
200         struct ibv_mr *keyHeader;
201         struct OtherNodeStruct *destNode;
202         struct infiPacketStruct *next;
203         OutgoingMsg ogm;
204         struct ibv_sge elemList[2];
205         struct ibv_send_wr wr;
206 }* infiPacket;
207
208 /*
209 typedef struct infiBufferedWCStruct{
210         struct ibv_wc wcList[WC_BUFFER_SIZE];
211         int count;
212         struct infiBufferedWCStruct *next,*prev;
213 } * infiBufferedWC;
214 */
215
216 #define BCASTLIST_SIZE 50
217
218 struct infiBufferedBcastStruct{
219         char *msg;
220         int size;
221         int broot;
222         int asm_rank;
223         int valid;
224 };
225
226 typedef struct infiBufferedBcastPoolStruct{
227         struct infiBufferedBcastPoolStruct *next,*prev;
228         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
229         int count;
230
231 } *infiBufferedBcastPool;
232
233
234
235
236 /***
237         This structure represents the data needed by the infiniband
238         communication routines of a node
239         TODO: add locking for the smp version
240 */
241 struct infiContext {
242         struct ibv_context      *context;
243         
244         fd_set  asyncFds;
245         struct timeval tmo;
246         
247         int ibPort;
248 //      struct ibv_comp_channel *channel;
249         struct ibv_pd           *pd;
250         struct ibv_cq           *sendCq;
251         struct ibv_cq   *recvCq;
252         struct ibv_srq  *srq;
253         
254         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
255                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
256                                                                                                 //when the qps are stored in the corresponding OtherNodes
257
258         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
259
260         infiPacket infiPacketFreeList; 
261         
262         struct infiBufferPool *recvBufferPool;
263
264         struct infiPacketHeader header;
265
266         int srqSize;
267         int sendCqSize,recvCqSize;
268         int tokensLeft;
269
270         infiBufferedBcastPool bufferedBcastList;
271         
272         struct infiRdmaPacket *bufferedRdmaAcks;
273         
274         struct infiRdmaPacket *bufferedRdmaRequests;
275 /*      infiBufferedWC infiBufferedRecvList;*/
276         
277         int insideProcessBufferedBcasts;
278 };
279
280 static struct infiContext *context;
281
282
283
284
285 /** Represents a qp used to send messages to another node
286  There is one for each remote node
287 */
288 struct infiAddr {
289         int lid,qpn,psn;
290 };
291
292 /**
293  Stored in the OtherNode structure in machine-dgram.c 
294  Store the per node data for ibverbs layer
295 */
296 enum { INFI_HEADER_DATA=21,INFI_DATA};
297
298 struct infiOtherNodeData{
299         struct ibv_qp *qp ;
300         int state;// does it expect a packet with a header (first packet) or one without
301         int totalTokens;
302         int tokensLeft;
303         int nodeNo;
304         
305         int postedRecvs;
306         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
307 #if     CMK_IBVERBS_DEBUG       
308         int psn;
309         int recvPsn;
310 #endif  
311 };
312
313
314 /********************************
315 Memory management structures and types
316 *****************/
317
318 struct infiCmiChunkHeaderStruct;
319
320 typedef struct infiCmiChunkMetaDataStruct {
321         struct ibv_mr *key;
322         int poolIdx;
323         void *nextBuf;
324         struct infiCmiChunkHeaderStruct *owner;
325         int count;
326
327 #if THREAD_MULTI_POOL
328         int parentPe;                                           // the PE that allocated the buffer and must release it
329 #endif
330 } infiCmiChunkMetaData;
331
332
333
334
335 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
336
337 typedef struct {
338         int size;//without infiCmiChunkHeader
339         void *startBuf;
340         int count;
341 } infiCmiChunkPool;
342
343 #define INFINUMPOOLS 20
344 #define INFIMAXPERPOOL 100
345 #define INFIMULTIPOOL -5
346
347 #if THREAD_MULTI_POOL
348 infiCmiChunkPool **infiCmiChunkPools;
349 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
350 #else
351 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
352 #endif
353
354 static void initInfiCmiChunkPools();
355
356
357 static inline infiPacket newPacket(){
358         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
359         pkt->size = -1;
360         pkt->header = context->header;
361         pkt->next = NULL;
362         pkt->destNode = NULL;
363         pkt->keyHeader = METADATAFIELD(pkt)->key;
364         pkt->ogm=NULL;
365         CmiAssert(pkt->keyHeader!=NULL);
366         
367         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
368         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
369         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
370         
371         pkt->wr.wr_id = (uint64_t)pkt;
372         pkt->wr.sg_list = &(pkt->elemList[0]);
373         pkt->wr.num_sge = 2;
374         pkt->wr.opcode = IBV_WR_SEND;
375         pkt->wr.send_flags = IBV_SEND_SIGNALED;
376         pkt->wr.next = NULL;
377         
378         return pkt;
379 };
380
381 #define FreeInfiPacket(pkt){ \
382         pkt->size = -1;\
383         pkt->ogm=NULL;\
384         pkt->next = context->infiPacketFreeList; \
385         context->infiPacketFreeList = pkt; \
386 }
387
388 #define MallocInfiPacket(pkt) { \
389         infiPacket p = context->infiPacketFreeList; \
390         if(p == NULL){ p = newPacket();} \
391                  else{context->infiPacketFreeList = p->next; } \
392         pkt = p;\
393 }
394
395
396
397
398 /******************CmiMachineInit and its helper functions*/
399 static inline int pollSendCq(const int toBuffer);
400
401 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
402 static uint16_t getLocalLid(struct ibv_context *context, int port);
403 static int  checkQp(struct ibv_qp *qp){
404         struct ibv_qp_attr attr;
405         struct ibv_qp_init_attr init_attr;
406                  
407         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
408         if(attr.cur_qp_state != IBV_QPS_RTS){
409                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
410                 return 0;
411         }
412         return 1;
413 }
414 static void checkAllQps(){
415         int i;
416         for(i=0;i<_Cmi_numnodes;i++){
417                 if(i != _Cmi_mynode){
418                         if(!checkQp(nodes[i].infiData->qp)){
419                                 pollSendCq(0);
420                                 CmiAssert(0);
421                         }
422                 }
423         }
424 }
425
426 #if CMK_IBVERBS_FAST_START
427 static void send_partial_init();
428 #endif
429
430 static void CmiMachineInit(char **argv){
431         struct ibv_device **devList;
432         struct ibv_device *dev;
433         int ibPort;
434         int i;
435         int calcMaxSize;
436         infiPacket *pktPtrs;
437         struct infiRdmaPacket **rdmaPktPtrs;
438
439         MACHSTATE(3,"CmiMachineInit {");
440         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
441         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
442         
443         //TODO: make the device and ibport configureable by commandline parameter
444         //Check example for how to do that
445         devList =  ibv_get_device_list(NULL);
446         CmiAssert(devList != NULL);
447
448         dev = *devList;
449         CmiAssert(dev != NULL);
450
451         ibPort=1;
452
453         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
454
455         context = (struct infiContext *)malloc(sizeof(struct infiContext));
456         
457         MACHSTATE1(3,"context allocated %p",context);
458         
459         //localAddr will store the local addresses of all the qps
460         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
461         
462         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
463         
464         context->ibPort = ibPort;
465         //the context for this infiniband device 
466         context->context = ibv_open_device(dev);
467         CmiAssert(context->context != NULL);
468         
469         MACHSTATE1(3,"device opened %p",context->context);
470
471 /*      FD_ZERO(&context->asyncFds);
472         FD_SET(context->context->async_fd,&context->asyncFds);
473         context->tmo.tv_sec=0;
474         context->tmo.tv_usec=0;
475         
476         MACHSTATE(3,"asyncFds zeroed and set");*/
477
478         //protection domain
479         context->pd = ibv_alloc_pd(context->context);
480         CmiAssert(context->pd != NULL);
481         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
482
483   /******** At this point we know that this node is more or less serviceable
484         So, this is a good point for sending the partial init message for the fast
485         start case
486         Moreover, no work dependent on the number of nodes has started yet.
487         ************/
488
489 #if CMK_IBVERBS_FAST_START
490   send_partial_init();
491 #endif
492
493
494         context->header.nodeNo = _Cmi_mynode;
495
496         mtu_size=1200;
497         packetSize = mtu_size*4;
498         dataSize = packetSize-sizeof(struct infiPacketHeader);
499         
500         calcMaxSize=8000;
501 /*      if(_Cmi_numnodes*50 > calcMaxSize){
502                 calcMaxSize = _Cmi_numnodes*50;
503                 if(calcMaxSize > 10000){
504                         calcMaxSize = 10000;
505                 }
506         }*/
507 //      maxRecvBuffers=80;
508         maxRecvBuffers=calcMaxSize;
509         maxTokens = maxRecvBuffers;
510 //      maxTokens = 80;
511         context->tokensLeft=maxTokens;
512         //tokensPerProcessor=4;
513         if(_Cmi_numnodes > 1){
514                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
515         }
516         
517         
518         //TURN ON RDMA
519         rdma=1;
520 //      rdmaThreshold=32768;
521         rdmaThreshold=22000;
522         firstBinSize = 120;
523         CmiAssert(rdmaThreshold > firstBinSize);
524         blockAllocRatio=16;
525         blockThreshold=8;
526
527 #if !THREAD_MULTI_POOL
528         initInfiCmiChunkPools();
529 #endif
530
531         /*create the pool of send packets*/
532         sendPacketPoolSize = maxTokens/2;       
533         if(sendPacketPoolSize > 2000){
534                 sendPacketPoolSize = 2000;
535         }
536         
537         context->infiPacketFreeList=NULL;
538         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
539
540         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
541 #if !THREAD_MULTI_POOL
542         for(i=0;i<sendPacketPoolSize;i++){
543                 MallocInfiPacket(pktPtrs[i]);   
544         }
545
546         for(i=0;i<sendPacketPoolSize;i++){
547                 FreeInfiPacket(pktPtrs[i]);     
548         }
549         free(pktPtrs);
550 #endif
551         
552         context->bufferedBcastList=NULL;
553         context->bufferedRdmaAcks = NULL;
554         context->bufferedRdmaRequests = NULL;
555         context->insideProcessBufferedBcasts=0;
556         
557         
558         if(rdma){
559 /*              int numPkts;
560                 int k;
561                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
562                         numPkts = _Cmi_numnodes*4;
563                 }else{
564                         numPkts = maxRecvBuffers/4;
565                 }
566                 
567                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
568                 for(k=0;k<numPkts;k++){
569                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
570                 }
571                 
572                 for(k=0;k<numPkts;k++){
573                         CmiFree(rdmaPktPtrs[k]);
574                 }
575                 free(rdmaPktPtrs);*/
576         }
577         
578 /*      context->infiBufferedRecvList = NULL;*/
579 #if CMK_IBVERBS_STATS   
580         regCount =0;
581         regTime  = 0;
582
583         pktCount=0;
584         msgCount=0;
585
586         processBufferedCount=0;
587         processBufferedTime=0;
588
589         //TODO: erase this
590         processLockTime = 0;
591
592         minTokensLeft = maxTokens;
593 #endif  
594
595         
596
597         MACHSTATE(3,"} CmiMachineInit");
598 }
599
600 void CmiCommunicationInit(char **argv)
601 {
602 #if THREAD_MULTI_POOL
603         initInfiCmiChunkPools();
604         fillBufferPools();
605 #endif
606 }
607
608 /*********
609         Open a qp for every processor
610 *****/
611 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
612         int myLid;
613         int i;
614         
615         
616         //find my lid
617         myLid = getLocalLid(context->context,ibPort);
618         
619         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
620
621         context->sendCqSize = maxTokens+2;
622         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
623         CmiAssert(context->sendCq != NULL);
624         
625         MACHSTATE1(3,"sendCq created %p",context->sendCq);
626         
627         
628         context->recvCqSize = maxRecvBuffers;
629         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
630         
631         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
632         CmiAssert(context->recvCq != NULL);
633         
634         //array of queue pairs
635
636         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
637
638         if(numNodes > 1)
639         {
640                 context->srqSize = (maxRecvBuffers+2);
641                 struct ibv_srq_init_attr srqAttr = {
642                         .attr = {
643                         .max_wr  = context->srqSize,
644                         .max_sge = 1
645                         }
646                 };
647                 context->srq = ibv_create_srq(context->pd,&srqAttr);
648                 CmiAssert(context->srq != NULL);
649         
650                 struct ibv_qp_init_attr initAttr = {
651                         .qp_type = IBV_QPT_RC,
652                         .send_cq = context->sendCq,
653                         .recv_cq = context->recvCq,
654                         .srq             = context->srq,
655                         .sq_sig_all = 0,
656                         .qp_context = NULL,
657                         .cap     = {
658                                 .max_send_wr  = maxTokens,
659                                 .max_send_sge = 2,
660                         },
661                 };
662                 struct ibv_qp_attr attr;
663
664                 attr.qp_state        = IBV_QPS_INIT;
665                 attr.pkey_index      = 0;
666                 attr.port_num        = ibPort;
667                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
668
669 /*              MACHSTATE1(3,"context->pd %p",context->pd);
670                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
671                 MACHSTATE1(3,"TEST QP %p",qp);*/
672
673                 for( i=0;i<numNodes;i++){
674                         if(i == myNode){
675                         }else{
676                                 localAddr[i].lid = myLid;
677                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
678                         
679                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
680                         
681                                 CmiAssert(context->qp[i] != NULL);
682                         
683                         
684                                 ibv_modify_qp(context->qp[i], &attr,
685                                           IBV_QP_STATE              |
686                                           IBV_QP_PKEY_INDEX         |
687                                         IBV_QP_PORT               |
688                                         IBV_QP_ACCESS_FLAGS);           
689
690                                 localAddr[i].qpn = context->qp[i]->qp_num;
691                                 localAddr[i].psn = lrand48() & 0xffffff;
692                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
693                         }
694                 }
695         }
696         MACHSTATE(3,"qps created");
697 };
698
699 void copyInfiAddr(ChInfiAddr *qpList){
700         int qpListIdx=0;
701         int i;
702         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
703         for(i=0;i<_Cmi_numnodes;i++){
704                 if(i == _Cmi_mynode){
705                 }else{
706                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
707                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
708                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
709                         qpListIdx++;
710                 }
711         }
712 }
713
714
715 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
716         struct ibv_port_attr attr;
717
718         if (ibv_query_port(dev_context, port, &attr))
719                 return 0;
720
721         return attr.lid;
722 }
723
724 /**************** END OF CmiMachineInit and its helper functions*/
725
726 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
727 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
728
729 /* Initial the infiniband specific data for a remote node
730         1. connect the qp and store it in and return it
731 **/
732 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
733         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
734         int err;
735         ret->state = INFI_HEADER_DATA;
736         ret->qp = context->qp[node];
737 //      ret->totalTokens = tokensPerProcessor;
738 //      ret->tokensLeft = tokensPerProcessor;
739         ret->nodeNo = node;
740 //      ret->postedRecvs = tokensPerProcessor;
741 #if     CMK_IBVERBS_DEBUG       
742         ret->psn = 0;
743         ret->recvPsn = 0;
744 #endif
745         
746         struct ibv_qp_attr attr = {
747                 .qp_state               = IBV_QPS_RTR,
748                 .path_mtu               = mtu,
749                 .dest_qp_num            = addr[1],
750                 .rq_psn                 = addr[2],
751                 .max_dest_rd_atomic     = 1,
752                 .min_rnr_timer          = 31,
753                 .ah_attr                = {
754                         .is_global      = 0,
755                         .dlid           = addr[0],
756                         .sl             = 0,
757                         .src_path_bits  = 0,
758                         .port_num       = context->ibPort
759                 }
760         };
761         
762         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
763         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
764         
765         if (err = ibv_modify_qp(ret->qp, &attr,
766           IBV_QP_STATE              |
767           IBV_QP_AV                 |
768           IBV_QP_PATH_MTU           |
769           IBV_QP_DEST_QPN           |
770           IBV_QP_RQ_PSN             |
771           IBV_QP_MAX_DEST_RD_ATOMIC |
772           IBV_QP_MIN_RNR_TIMER)) {
773                         MACHSTATE1(3,"ERROR %d",err);
774                         CmiAbort("failed to change qp state to RTR");
775         }
776
777         MACHSTATE(3,"qp state changed to RTR");
778         
779         attr.qp_state       = IBV_QPS_RTS;
780         attr.timeout        = 26;
781         attr.retry_cnt      = 20;
782         attr.rnr_retry      = 7;
783         attr.sq_psn         = context->localAddr[node].psn;
784         attr.max_rd_atomic  = 1;
785
786         
787         if (ibv_modify_qp(ret->qp, &attr,
788           IBV_QP_STATE              |
789           IBV_QP_TIMEOUT            |
790           IBV_QP_RETRY_CNT          |
791           IBV_QP_RNR_RETRY          |
792           IBV_QP_SQ_PSN             |
793           IBV_QP_MAX_QP_RD_ATOMIC)) {
794                         fprintf(stderr, "Failed to modify QP to RTS\n");
795                         exit(1);
796         }
797         MACHSTATE(3,"qp state changed to RTS");
798
799         MACHSTATE(3,"} initInfiOtherNodeData");
800         return ret;
801 }
802
803
804 void    infiPostInitialRecvs(){
805         //create the pool and post the receives
806         int numPosts;
807 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
808                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
809         }else{
810                 numPosts = maxRecvBuffers;
811         }*/
812
813         if(_Cmi_numnodes > 1){
814                 numPosts = maxRecvBuffers;
815         }else{
816                 numPosts = 0;
817         }
818         if(numPosts > 0){
819                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
820                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
821         }
822
823
824         free(context->qp);
825         context->qp = NULL;
826         free(context->localAddr);
827         context->localAddr= NULL;
828 }
829
830 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
831         int numBuffers;
832         int i;
833         int bigSize;
834         char *bigBuf;
835         struct infiBufferPool *ret;
836         struct ibv_mr *bigKey;
837
838         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
839
840         page_size = sysconf(_SC_PAGESIZE);
841         ret = malloc(sizeof(struct infiBufferPool));
842         ret->next = NULL;
843         numBuffers=ret->numBuffers = numRecvs;
844         
845         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
846         
847         bigSize = numBuffers*sizePerBuffer;
848         bigBuf=malloc(bigSize);
849         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
850         CmiAssert(bigKey != NULL);
851         
852         for(i=0;i<numBuffers;i++){
853                 struct infiBuffer *buffer =  &(ret->buffers[i]);
854                 buffer->type = BUFFER_RECV;
855                 buffer->size = sizePerBuffer;
856                 
857                 /*buffer->buf = malloc(sizePerBuffer);
858                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
859                 buffer->buf = &bigBuf[i*sizePerBuffer];
860                 buffer->key = bigKey;
861
862                 if(buffer->key == NULL){
863                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
864                         CmiAssert(buffer->key != NULL);
865                 }
866         }
867         return ret;
868 };
869
870
871
872 /**
873          Post the buffers as recv work requests
874 */
875 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
876         int j,err;
877         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
878         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
879         struct ibv_recv_wr *bad_wr;
880         
881         int startBufferIdx=0;
882         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
883         for(j=0;j<numRecvs;j++){
884                 
885                 
886                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
887                 sgElements[j].length = sizePerBuffer;
888                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
889                 
890                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
891                 workRequests[j].sg_list = &sgElements[j];
892                 workRequests[j].num_sge = 1;
893                 if(j != numRecvs-1){
894                         workRequests[j].next = &workRequests[j+1];
895                 }
896                 
897         }
898         workRequests[numRecvs-1].next = NULL;
899         MACHSTATE(3,"About to call ibv_post_srq_recv");
900         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
901                 CmiAssert(0);
902         }
903
904         free(workRequests);
905         free(sgElements);
906 }
907
908
909
910
911 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
912
913 static void CmiMachineExit()
914 {
915 #if CMK_IBVERBS_STATS   
916         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
917 #endif
918
919 #if THREAD_MULTI_POOL
920         printf("IBVERBS Multi pool version\n");
921 #endif
922
923         //TODO: erase this
924         printf("Alloc time = %.8f  Lock time = %.8f count = %d\n",processBufferedTime/processBufferedCount,processLockTime/processBufferedCount,processBufferedCount);
925
926         //TODO: ERASE this
927         MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
928 }
929 static void ServiceCharmrun_nolock();
930
931 static void CmiNotifyStillIdle(CmiIdleState *s) {
932 #if CMK_SMP
933         CmiCommLock();
934 /*      if(where == COMM_SERVER_FROM_SMP)*/
935 #endif
936 /*              ServiceCharmrun_nolock();*/
937
938         CommunicationServer_nolock(0);
939 #if CMK_SMP
940         CmiCommUnlock();
941 #endif
942 }
943
944 static inline void increaseTokens(OtherNode node);
945
946 static inline int pollRecvCq(const int toBuffer);
947 static inline int pollSendCq(const int toBuffer);
948
949
950 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
951 #if !CMK_IBVERBS_TOKENS_FLOW
952         return;
953 #else
954         //if(infiData->tokensLeft == 0){
955         if(context->tokensLeft == 0){
956                 MACHSTATE(3,"GET FREE TOKENS {{{");
957         }else{
958                 return;
959         }
960         while(context->tokensLeft == 0){
961                 CommunicationServer_nolock(1); 
962         }
963         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
964 #endif
965 }
966
967
968 /**
969         Packetize this data and send it
970 **/
971
972
973
974 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
975         int incTokens=0;
976         int retval;
977 #if     CMK_IBVERBS_DEBUG
978         packet->header.psn = (++node->infiData->psn);
979 #endif  
980
981
982
983         packet->elemList[1].addr = (uintptr_t)packet->buf;
984         packet->elemList[1].length = size;
985         packet->elemList[1].lkey = dataKey->lkey;
986         
987         
988         packet->destNode = node;
989         
990 #if CMK_IBVERBS_STATS   
991         pktCount++;
992 #endif  
993         
994         getFreeTokens(node->infiData);
995
996 #if CMK_IBVERBS_INCTOKENS       
997         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
998                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
999                 incTokens=1;
1000         }
1001 #endif
1002 /*
1003         if(!checkQp(node->infiData->qp)){
1004                 pollSendCq(1);
1005                 CmiAssert(0);
1006         }*/
1007
1008         struct ibv_send_wr *bad_wr=NULL;
1009         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1010                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
1011                 CmiAssert(0);
1012         }
1013 #if     CMK_IBVERBS_TOKENS_FLOW
1014         context->tokensLeft--;
1015 #if     CMK_IBVERBS_STATS
1016         if(context->tokensLeft < minTokensLeft){
1017                 minTokensLeft = context->tokensLeft;
1018         }
1019 #endif
1020 #endif
1021
1022 /*      if(!checkQp(node->infiData->qp)){
1023                 pollSendCq(1);
1024                 CmiAssert(0);
1025         }*/
1026
1027 #if CMK_IBVERBS_INCTOKENS       
1028         if(incTokens){
1029                 increaseTokens(node);
1030         }
1031 #endif
1032
1033
1034 #if     CMK_IBVERBS_DEBUG
1035         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1036 #else
1037         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1038 #endif
1039
1040 };
1041
1042
1043 static void inline EnqueueDummyPacket(OtherNode node,int size){
1044         infiPacket packet;
1045         MallocInfiPacket(packet);
1046         packet->size = size;
1047         packet->buf = CmiAlloc(size);
1048         
1049         packet->header.code = INFIDUMMYPACKET;
1050
1051         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1052         
1053         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1054         EnqueuePacket(node,packet,size,key);
1055 }
1056
1057
1058
1059
1060
1061
1062 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1063         infiPacket packet;
1064         MallocInfiPacket(packet);
1065         packet->size = size;
1066         packet->buf=data;
1067         
1068         //the nodeNo is added at time of packet allocation
1069         packet->header.code = INFIPACKETCODE_DATA;
1070         
1071         ogm->refcount++;
1072         packet->ogm = ogm;
1073         
1074         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1075         CmiAssert(key != NULL);
1076         
1077         EnqueuePacket(node,packet,size,key);
1078 };
1079
1080 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1081 static inline void processAllBufferedMsgs();
1082
1083 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1084         int size; char *data;
1085 //      processAllBufferedMsgs();
1086
1087         
1088         ogm->refcount++;
1089   size = ogm->size;
1090   data = ogm->data;
1091
1092 #if CMK_IBVERBS_STATS   
1093         msgCount++;
1094 #endif
1095
1096         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1097         //First packet has dgram header, other packets dont
1098         
1099   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1100         
1101         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1102
1103         if(rdma && size > rdmaThreshold){
1104                         EnqueueRdmaPacket(ogm,node);
1105         }else{
1106         
1107                 while(size > dataSize){
1108                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1109                         size -= dataSize;
1110                         data += dataSize;
1111                 }
1112                 if(size > 0){
1113                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1114                 }
1115         }
1116 //#if   !CMK_SMP
1117         processAllBufferedMsgs();
1118 //#endif
1119         ogm->refcount--;
1120         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1121 }
1122
1123
1124 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1125         infiPacket packet;
1126
1127         ogm->refcount++;
1128         
1129         MallocInfiPacket(packet);
1130  
1131  {
1132                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1133
1134                 
1135                 packet->size = sizeof(struct infiRdmaPacket);
1136                 packet->buf = (char *)rdmaPacket;
1137                 
1138 /*              struct ibv_mr *key = ibv_reg_mr(context->pd,ogm->data,ogm->size,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE);*/
1139                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1140                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1141                 
1142                 packet->header.code = INFIRDMA_START;
1143                 packet->header.nodeNo = _Cmi_mynode;
1144                 packet->ogm = NULL;
1145
1146                 rdmaPacket->type = INFI_MESG;
1147                 rdmaPacket->ogm = ogm;
1148                 rdmaPacket->key = *key;
1149                 rdmaPacket->keyPtr = key;
1150                 rdmaPacket->remoteBuf = ogm->data;
1151                 rdmaPacket->remoteSize = ogm->size;
1152                 
1153                 
1154                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1155                 
1156                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1157                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1158         }
1159 }
1160
1161
1162
1163 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1164 static inline void processSendWC(struct ibv_wc *sendWC);
1165 static unsigned int _count=0;
1166 extern int errno;
1167 static int _countAsync=0;
1168 static inline void processAsyncEvents(){
1169         struct ibv_async_event event;
1170         int ready;
1171         _countAsync++;
1172         if(_countAsync < 1){
1173                 return;
1174         }
1175         _countAsync=0;
1176         FD_SET(context->context->async_fd,&context->asyncFds);
1177         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1178         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1179         
1180         if(ready==0){
1181                 return;
1182         }
1183         if(ready == -1){
1184 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1185                 return;
1186         }
1187         
1188         if (ibv_get_async_event(context->context, &event)){
1189                 return;
1190                 CmiAbort("get async event failed");
1191         }
1192         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1193         ibv_ack_async_event(&event);
1194
1195         
1196 }
1197
1198 static void pollCmiDirectQ();
1199
1200 static inline  void CommunicationServer_nolock(int toBuffer) {
1201         int processed;
1202         if(_Cmi_numnodes <= 1){
1203                 pollCmiDirectQ();
1204                 return;
1205         }
1206         MACHSTATE(2,"CommServer_nolock{");
1207         
1208 //      processAsyncEvents();
1209         
1210 //      checkAllQps();
1211
1212         pollCmiDirectQ();
1213
1214         processed = pollRecvCq(toBuffer);
1215         
1216
1217         processed += pollSendCq(toBuffer);
1218         
1219         if(toBuffer == 0){
1220 //              if(processed != 0)
1221                         processAllBufferedMsgs();
1222         }
1223         
1224 //      checkAllQps();
1225 //      _count--;
1226         MACHSTATE(2,"} CommServer_nolock ne");
1227         
1228 }
1229 /*
1230 static inline infiBufferedWC createInfiBufferedWC(){
1231         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1232         ret->count = 0;
1233         ret->next = ret->prev =NULL;
1234         return ret;
1235 }*/
1236
1237 /****
1238         The buffered recvWC are stored in a doubly linked list of 
1239         arrays or blocks of wcs.
1240         To keep the average insert cost low, a new block is added 
1241         to the top of the list. (resulting in a reverse seq of blocks)
1242         Within a block however wc are stored in a sequence
1243 *****/
1244 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1245         infiBufferedWC block;
1246         MACHSTATE(3,"Insert Buffered Recv called");
1247         if( context->infiBufferedRecvList ==NULL){
1248                 context->infiBufferedRecvList = createInfiBufferedWC();
1249                 block = context->infiBufferedRecvList;
1250         }else{
1251                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1252                         block = createInfiBufferedWC();
1253                         context->infiBufferedRecvList->prev = block;
1254                         block->next = context->infiBufferedRecvList;
1255                         context->infiBufferedRecvList = block;
1256                 }else{
1257                         block = context->infiBufferedRecvList;
1258                 }
1259         }
1260         
1261         block->wcList[block->count] = *wc;
1262         block->count++;
1263 };
1264 */
1265
1266
1267 /********
1268 go through the blocks of bufferedWC. Process the last block first.
1269 Then the next one and so on. (Processing within a block should happen
1270 in sequence).
1271 Leave the last block in place to avoid having to allocate again
1272 ******/
1273 /*static inline void processBufferedRecvList(){
1274         infiBufferedWC start;
1275         start = context->infiBufferedRecvList;
1276         while(start->next != NULL){
1277                 start = start->next;
1278         }
1279         while(start != NULL){
1280                 int i=0;
1281                 infiBufferedWC tmp;
1282                 for(i=0;i<start->count;i++){
1283                         processRecvWC(&start->wcList[i]);
1284                 }
1285                 if(start != context->infiBufferedRecvList){
1286                         //not the first one
1287                         tmp = start;
1288                         start = start->prev;
1289                         free(tmp);
1290                         start->next = NULL;
1291                 }else{
1292                         start = start->prev;
1293                 }
1294         }
1295         context->infiBufferedRecvList->next = NULL;
1296         context->infiBufferedRecvList->prev = NULL;
1297         context->infiBufferedRecvList->count = 0;
1298 }
1299 */
1300
1301 static inline int pollRecvCq(const int toBuffer){
1302         int i;
1303         int ne;
1304         struct ibv_wc wc[WC_LIST_SIZE];
1305         
1306         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1307         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1308 //      CmiAssert(ne >=0);
1309         
1310         if(ne != 0){
1311                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1312         }
1313         
1314         for(i=0;i<ne;i++){
1315                 if(wc[i].status != IBV_WC_SUCCESS){
1316                         CmiAssert(0);
1317                 }
1318                 switch(wc[i].opcode){
1319                         case IBV_WC_RECV:
1320                                         processRecvWC(&wc[i],toBuffer);
1321                                 break;
1322                         default:
1323                                 CmiAbort("Wrong type of work completion object in recvq");
1324                                 break;
1325                 }
1326                         
1327         }
1328         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1329         return ne;
1330
1331 }
1332
1333 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1334
1335 static inline int pollSendCq(const int toBuffer){
1336         int i;
1337         int ne;
1338         struct ibv_wc wc[WC_LIST_SIZE];
1339
1340         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1341 //      CmiAssert(ne >=0);
1342         
1343         
1344         for(i=0;i<ne;i++){
1345                 if(wc[i].status != IBV_WC_SUCCESS){
1346                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1347 #if CMK_IBVERBS_STATS
1348         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1349 #endif
1350                         CmiAssert(0);
1351                 }
1352                 switch(wc[i].opcode){
1353                         case IBV_WC_SEND:{
1354                                 //message received
1355                                 processSendWC(&wc[i]);
1356                                 
1357                                 break;
1358                                 }
1359                         case IBV_WC_RDMA_READ:
1360                         {
1361 //                              processRdmaWC(&wc[i],toBuffer);
1362                                         processRdmaWC(&wc[i],1);
1363                                 break;
1364                         }
1365                         case IBV_WC_RDMA_WRITE:
1366                         {
1367                                 /*** used for CmiDirect puts 
1368                                 Nothing needs to be done on the sender side once send is done **/
1369                                 break;
1370                         }
1371                         default:
1372                                 CmiAbort("Wrong type of work completion object in recvq");
1373                                 break;
1374                 }
1375                         
1376         }
1377         return ne;
1378 }
1379
1380
1381 /******************
1382 Check the communication server socket and
1383
1384 *****************/
1385 int CheckSocketsReady(int withDelayMs)
1386 {   
1387   int nreadable;
1388   CMK_PIPE_DECL(withDelayMs);
1389
1390   CmiStdoutAdd(CMK_PIPE_SUB);
1391   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1392
1393   nreadable=CMK_PIPE_CALL();
1394   ctrlskt_ready_read = 0;
1395   dataskt_ready_read = 0;
1396   dataskt_ready_write = 0;
1397   
1398   if (nreadable == 0) {
1399     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1400     return nreadable;
1401   }
1402   if (nreadable==-1) {
1403     CMK_PIPE_CHECKERR();
1404     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1405     return CheckSocketsReady(0);
1406   }
1407   CmiStdoutCheck(CMK_PIPE_SUB);
1408   if (Cmi_charmrun_fd!=-1) 
1409           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1410   MACHSTATE(1,"} CheckSocketsReady")
1411   return nreadable;
1412 }
1413
1414
1415 /*** Service the charmrun socket
1416 *************/
1417
1418 static void ServiceCharmrun_nolock()
1419 {
1420   int again = 1;
1421   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1422   while (again)
1423   {
1424   again = 0;
1425   CheckSocketsReady(0);
1426   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1427   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1428   }
1429   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1430 }
1431
1432
1433
1434 static void CommunicationServer(int sleepTime, int where){
1435         if( where == COMM_SERVER_FROM_INTERRUPT){
1436                 return;
1437         }
1438 #if CMK_SMP
1439         if(where == COMM_SERVER_FROM_WORKER){
1440                 return;
1441         }
1442         if(where == COMM_SERVER_FROM_SMP){
1443 #endif
1444                 ServiceCharmrun_nolock();
1445 #if CMK_SMP
1446         }
1447         CmiCommLock();
1448 #endif
1449         CommunicationServer_nolock(0);
1450 #if CMK_SMP
1451         CmiCommUnlock();
1452 #endif
1453 }
1454
1455
1456 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1457
1458
1459 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1460
1461 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1462         char *newmsg;
1463         
1464         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1465         
1466         OtherNode node = &nodes[nodeNo];
1467         newmsg = node->asm_msg;         
1468         
1469         /// This simple state machine determines if this packet marks the beginning of a new message
1470         // from another node, or if this is another in a sequence of packets
1471         switch(node->infiData->state){
1472                 case INFI_HEADER_DATA:
1473                 {
1474                         int size;
1475                         int rank, srcpe, seqno, magic, i;
1476                         unsigned int broot;
1477                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1478                         size = CmiMsgHeaderGetLength(msg);
1479                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1480 //                      CmiAssert(size > 0);
1481 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1482                         
1483 //                      CmiAssert(newmsg == NULL);
1484                         if(len > size){
1485                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1486                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1487                         }
1488                         newmsg = (char *)CmiAlloc(size);
1489       _MEMCHECK(newmsg);
1490       memcpy(newmsg, msg, len);
1491       node->asm_rank = rank;
1492       node->asm_total = size;
1493       node->asm_fill = len;
1494       node->asm_msg = newmsg;
1495                         node->infiData->broot = broot;
1496                         if(len == size){
1497                                 //this is the only packet for this message 
1498                                 node->infiData->state = INFI_HEADER_DATA;
1499                         }else{
1500                                 //there are more packets following
1501                                 node->infiData->state = INFI_DATA;
1502                         }
1503                         break;
1504                 }
1505                 case INFI_DATA:
1506                 {
1507                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1508                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1509                                 CmiAbort("packet in the middle does not have expected length");
1510                         }
1511                         if(node->asm_fill+len > node->asm_total){
1512                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1513                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1514                         }
1515                         //tODO: remove this
1516                         memcpy(newmsg + node->asm_fill,msg,len);
1517                         node->asm_fill += len;
1518                         if(node->asm_fill == node->asm_total){
1519                                 node->infiData->state = INFI_HEADER_DATA;
1520                         }else{
1521                                 node->infiData->state = INFI_DATA;
1522                         }
1523                         break;
1524                 }
1525         }
1526         /// if this packet was the last packet in a message ie state was 
1527         /// reset to infi_header_data
1528         
1529         if(node->infiData->state == INFI_HEADER_DATA){
1530                 int total_size = node->asm_total;
1531                 node->asm_msg = NULL;
1532                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1533                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1534         }
1535         
1536 };
1537
1538 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1539 #if CMK_BROADCAST_SPANNING_TREE
1540         if (rank == DGRAM_BROADCAST
1541 #if CMK_NODE_QUEUE_AVAILABLE
1542           || rank == DGRAM_NODEBROADCAST
1543 #endif
1544          ){
1545                                          if(toBuffer){
1546                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1547                                                 }else{
1548                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1549                                                 }
1550                                         }
1551 #elif CMK_BROADCAST_HYPERCUBE
1552         if (rank == DGRAM_BROADCAST
1553 #if CMK_NODE_QUEUE_AVAILABLE
1554           || rank == DGRAM_NODEBROADCAST
1555 #endif
1556          ){
1557                                          if(toBuffer){
1558                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1559                                          }else{
1560                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1561                                                 }
1562                                         }
1563 #endif
1564
1565
1566                 
1567                 switch (rank) {
1568         case DGRAM_BROADCAST: {
1569                                 int i;
1570                                 for (i=1; i<_Cmi_mynodesize; i++){
1571                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1572                                 }
1573           CmiPushPE(0, newmsg);
1574           break;
1575       }
1576 #if CMK_NODE_QUEUE_AVAILABLE
1577         case DGRAM_NODEBROADCAST: 
1578         case DGRAM_NODEMESSAGE: {
1579           CmiPushNode(newmsg);
1580           break;
1581         }
1582 #endif
1583         default:
1584                                 {
1585                                         
1586           CmiPushPE(rank, newmsg);
1587                                 }
1588         }    /* end of switch */
1589                 if(!toBuffer){
1590 //#if !CMK_SMP          
1591                 processAllBufferedMsgs();
1592 //#endif
1593                 }
1594 }
1595
1596
1597 static inline void increasePostedRecvs(int nodeNo);
1598 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1599 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1600
1601 //struct infiDirectRequestPacket;
1602 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1603
1604 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1605         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1606         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1607         int nodeNo = header->nodeNo;
1608 #if     CMK_IBVERBS_DEBUG
1609         OtherNode node = &nodes[nodeNo];
1610 #endif
1611         
1612         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1613 #if     CMK_IBVERBS_DEBUG
1614         if(node->infiData->recvPsn == 0){
1615                 node->infiData->recvPsn = header->psn;
1616         }else{
1617                 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1618                 node->infiData->recvPsn++;
1619         }
1620         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1621 #else
1622         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1623 #endif
1624         
1625         if(header->code & INFIPACKETCODE_DATA){
1626                         
1627                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1628         }
1629         if(header->code & INFIDUMMYPACKET){
1630                 MACHSTATE(3,"Dummy packet");
1631         }
1632         if(header->code & INFIBARRIERPACKET){
1633                 MACHSTATE(3,"Barrier packet");
1634                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1635         }
1636
1637 #if CMK_IBVERBS_INCTOKENS       
1638         if(header->code & INFIPACKETCODE_INCTOKENS){
1639                 increasePostedRecvs(nodeNo);
1640         }
1641 #endif  
1642         if(rdma && header->code & INFIRDMA_START){
1643                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1644 //              if(toBuffer){
1645                         //TODO: make a function of this and use for both acks and requests
1646                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1647                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1648                         *copyPacket = *rdmaPacket;
1649                         copyPacket->fromNodeNo = nodeNo;
1650                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1651                         context->bufferedRdmaRequests = copyPacket;
1652                         copyPacket->next = tmp;
1653                         copyPacket->prev = NULL;
1654                         if(tmp != NULL){
1655                                 tmp->prev = copyPacket;
1656                         }
1657 /*              }else{
1658                         processRdmaRequest(rdmaPacket,nodeNo,0);
1659                 }*/
1660         }
1661         if(rdma && header->code & INFIRDMA_ACK){
1662                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1663                 processRdmaAck(rdmaPacket);
1664         }
1665 /*      if(header->code & INFIDIRECT_REQUEST){
1666                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1667                 processDirectRequest(directRequestPacket);
1668         }*/
1669         {
1670                 struct ibv_sge list = {
1671                         .addr   = (uintptr_t) buffer->buf,
1672                         .length = buffer->size,
1673                         .lkey   = buffer->key->lkey
1674                 };
1675         
1676                 struct ibv_recv_wr wr = {
1677                         .wr_id = (uint64_t)buffer,
1678                         .sg_list = &list,
1679                         .num_sge = 1,
1680                         .next = NULL
1681                 };
1682                 struct ibv_recv_wr *bad_wr;
1683         
1684                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1685                         CmiAssert(0);
1686                 }
1687         }
1688
1689 };
1690
1691
1692
1693
1694 static inline  void processSendWC(struct ibv_wc *sendWC){
1695
1696         infiPacket packet = (infiPacket )sendWC->wr_id;
1697 #if CMK_IBVERBS_TOKENS_FLOW
1698 //      packet->destNode->infiData->tokensLeft++;
1699         context->tokensLeft++;
1700 #endif
1701
1702         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1703         if(packet->ogm != NULL){
1704                 packet->ogm->refcount--;
1705                 if(packet->ogm->refcount == 0){
1706                         GarbageCollectMsg(packet->ogm); 
1707                 }
1708         }else{
1709                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1710
1711                 }
1712         }
1713
1714         FreeInfiPacket(packet);
1715 };
1716
1717
1718
1719 /********************************************************************/
1720 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1721         int nodeNo = fromNodeNo;
1722         OtherNode node = &nodes[nodeNo];
1723         struct infiRdmaPacket *rdmaPacket;
1724
1725         getFreeTokens(node->infiData);
1726 #if CMK_IBVERBS_TOKENS_FLOW
1727 //      node->infiData->tokensLeft--;
1728         context->tokensLeft--;
1729 #if     CMK_IBVERBS_STATS
1730         if(context->tokensLeft < minTokensLeft){
1731                 minTokensLeft = context->tokensLeft;
1732         }
1733 #endif
1734 #endif
1735         
1736         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1737 //      CmiAssert(buffer != NULL);
1738         
1739         
1740         if(isBuffered){
1741                 rdmaPacket = _rdmaPacket;
1742         }else{
1743                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1744                 *rdmaPacket = *_rdmaPacket;
1745         }
1746
1747
1748         rdmaPacket->fromNodeNo = fromNodeNo;
1749         rdmaPacket->localBuffer = (void *)buffer;
1750         
1751         buffer->type = BUFFER_RDMA;
1752         buffer->size = rdmaPacket->remoteSize;
1753         
1754         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1755 //      CmiAssert(buffer->buf != NULL);
1756
1757         buffer->key = METADATAFIELD(buffer->buf)->key;
1758
1759         
1760         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1761         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1762 //      CmiAssert(buffer->key != NULL);
1763         
1764         {
1765                 struct ibv_sge list = {
1766                         .addr = (uintptr_t )buffer->buf,
1767                         .length = buffer->size,
1768                         .lkey   = buffer->key->lkey
1769                 };
1770
1771                 struct ibv_send_wr *bad_wr;
1772                 struct ibv_send_wr wr = {
1773                         .wr_id = (uint64_t )rdmaPacket,
1774                         .sg_list = &list,
1775                         .num_sge = 1,
1776                         .opcode = IBV_WR_RDMA_READ,
1777                         .send_flags = IBV_SEND_SIGNALED,
1778                         .wr.rdma = {
1779                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1780                                 .rkey = rdmaPacket->key.rkey
1781                         }
1782                 };
1783                 /** post and rdma_read that is a rdma get*/
1784                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1785                         CmiAssert(0);
1786                 }
1787         }
1788
1789 };
1790
1791 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1792 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1793
1794 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1795                 //rdma get done
1796 #if CMK_IBVERBS_STATS
1797         double _startRegTime;
1798 #endif  
1799
1800         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1801 /*      if(rdmaPacket->type == INFI_DIRECT){
1802                 processDirectWC(rdmaPacket);
1803                 return;
1804         }*/
1805 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1806         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1807
1808         /*TODO: remove this
1809         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1810         
1811 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1812         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1813         
1814         {
1815                 int size;
1816                 int rank, srcpe, seqno, magic, i;
1817                 unsigned int broot;
1818                 char *msg = buffer->buf;
1819                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1820                 size = CmiMsgHeaderGetLength(msg);
1821 /*              CmiAssert(size == buffer->size);*/
1822                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1823         }
1824         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1825
1826         
1827         free(buffer);
1828         
1829         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1830         //we are sending this ack as a response to a successful
1831         // rdma_Read.. the token for that rdma_Read needs to be freed
1832 #if CMK_IBVERBS_TOKENS_FLOW     
1833         //node->infiData->tokensLeft++;
1834         context->tokensLeft++;
1835 #endif
1836
1837         //send ack to sender if toBuffer is off otherwise buffer it
1838         if(toBuffer){
1839                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1840                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1841                 context->bufferedRdmaAcks = rdmaPacket;
1842                 rdmaPacket->next = tmp;
1843                 rdmaPacket->prev = NULL;
1844                 if(tmp != NULL){
1845                         tmp->prev = rdmaPacket; 
1846                 }
1847         }else{
1848                 EnqueueRdmaAck(rdmaPacket);             
1849                 free(rdmaPacket);
1850         }
1851 }
1852
1853 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1854         infiPacket packet;
1855         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1856
1857         
1858         MallocInfiPacket(packet);
1859         {
1860                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1861                 *ackPacket = *rdmaPacket;
1862                 packet->size = sizeof(struct infiRdmaPacket);
1863                 packet->buf = (char *)ackPacket;
1864                 packet->header.code = INFIRDMA_ACK;
1865                 packet->ogm=NULL;
1866                 
1867                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1868         
1869
1870                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1871         }
1872 };
1873
1874
1875 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1876         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1877         rdmaPacket->ogm->refcount--;
1878         GarbageCollectMsg(rdmaPacket->ogm);
1879 }
1880
1881
1882 /****************************
1883  Deal with all the buffered (delayed) messages
1884  such as processing recvd broadcasts, sending
1885  rdma acks and processing recvd rdma requests
1886 ******************************/
1887
1888
1889 static inline infiBufferedBcastPool createBcastPool(){
1890         int i;
1891         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1892         ret->count = 0;
1893         ret->next = ret->prev = NULL;   
1894         for(i=0;i<BCASTLIST_SIZE;i++){
1895                 ret->bcastList[i].valid = 0;
1896         }
1897         return ret;
1898 };
1899 /****
1900         The buffered bcast messages are stored in a doubly linked list of 
1901         arrays or blocks.
1902         To keep the average insert cost low, a new block is added 
1903         to the top of the list. (resulting in a reverse seq of blocks)
1904         Within a block however bcast are stored in increasing order sequence
1905 *****/
1906
1907 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1908         if(context->bufferedBcastList == NULL){
1909                 context->bufferedBcastList = createBcastPool();
1910         }else{
1911                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1912                         infiBufferedBcastPool tmp;
1913                         tmp = createBcastPool();
1914                         context->bufferedBcastList->prev = tmp;
1915                         tmp->next = context->bufferedBcastList;
1916                         context->bufferedBcastList = tmp;
1917                 }
1918         }
1919         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1920         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1921         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1922         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1923         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1924         
1925         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1926         
1927         context->bufferedBcastList->count++;
1928 }
1929
1930 /*********
1931         Go through the blocks of buffered bcast messages. process last block first
1932         processign within a block is in sequence though
1933 *********/
1934 static inline void processBufferedBcast(){
1935         infiBufferedBcastPool start;
1936
1937         if(context->bufferedBcastList == NULL){
1938                 return;
1939         }
1940         start = context->bufferedBcastList;
1941         if(context->insideProcessBufferedBcasts==1){
1942                 return;
1943         }
1944         context->insideProcessBufferedBcasts=1;
1945
1946         while(start->next != NULL){
1947                 start = start->next;
1948         }
1949         
1950         while(start != NULL){
1951                 int i=0;
1952                 infiBufferedBcastPool tmp;
1953                 if(start->count != 0){
1954                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1955                 }
1956                 for(i=0;i<start->count;i++){
1957                         if(start->bcastList[i].valid == 0){
1958                                 continue;
1959                         }
1960                         start->bcastList[i].valid=0;
1961                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1962 #if CMK_BROADCAST_SPANNING_TREE
1963         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1964 #if CMK_NODE_QUEUE_AVAILABLE
1965           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1966 #endif
1967          ){
1968                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
1969                                         }
1970 #elif CMK_BROADCAST_HYPERCUBE
1971         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1972 #if CMK_NODE_QUEUE_AVAILABLE
1973           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1974 #endif
1975          ){
1976                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
1977                                         }
1978 #endif
1979                 }
1980                 if(start->count != 0){
1981                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1982                 }
1983                 
1984                 tmp = start;
1985                 start = start->prev;
1986                 free(tmp);
1987                 if(start != NULL){
1988                         //not the first one
1989                         start->next = NULL;
1990                 }
1991         }
1992
1993         context->bufferedBcastList = NULL;
1994 /*      context->bufferedBcastList->prev = NULL;
1995         context->bufferedBcastList->count =0;   */
1996         context->insideProcessBufferedBcasts=0;
1997         MACHSTATE(2,"processBufferedBcast done ");
1998 };
1999
2000
2001 static inline void processBufferedRdmaAcks(){
2002         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2003         if(start == NULL){
2004                 return;
2005         }
2006         while(start->next != NULL){
2007                 start = start->next;
2008         }
2009         while(start != NULL){
2010                 struct infiRdmaPacket *rdmaPacket=start;
2011                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2012                 EnqueueRdmaAck(rdmaPacket);
2013                 start = start->prev;
2014                 free(rdmaPacket);
2015         }
2016         context->bufferedRdmaAcks=NULL;
2017 }
2018
2019
2020
2021 static inline void processBufferedRdmaRequests(){
2022         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2023         if(start == NULL){
2024                 return;
2025         }
2026         
2027         
2028         while(start->next != NULL){
2029                 start = start->next;
2030         }
2031         while(start != NULL){
2032                 struct infiRdmaPacket *rdmaPacket=start;
2033                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2034                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2035                 start = start->prev;
2036         }
2037         
2038         context->bufferedRdmaRequests=NULL;
2039 }
2040
2041
2042
2043
2044
2045 static inline void processAllBufferedMsgs(){
2046 #if CMK_IBVERBS_STATS
2047         double _startTime = CmiWallTimer();
2048         processBufferedCount++;
2049 #endif
2050         processBufferedBcast();
2051
2052         processBufferedRdmaAcks();
2053         processBufferedRdmaRequests();
2054 #if CMK_IBVERBS_STATS
2055         processBufferedTime += (CmiWallTimer()-_startTime);
2056 #endif  
2057 };
2058
2059
2060 /*************************
2061         Increase tokens when short of them
2062 **********/
2063 static inline void increaseTokens(OtherNode node){
2064         int err;
2065         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2066         if(node->infiData->totalTokens + increase > maxTokens){
2067                 increase = maxTokens-node->infiData->totalTokens;
2068         }
2069         node->infiData->totalTokens += increase;
2070         node->infiData->tokensLeft += increase;
2071         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2072         //increase the size of the sendCq
2073         int currentCqSize = context->sendCqSize;
2074         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2075                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2076                 CmiAssert(0);
2077         }
2078         context->sendCqSize+= increase;
2079 };
2080
2081 static void increasePostedRecvs(int nodeNo){
2082         OtherNode node = &nodes[nodeNo];
2083         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2084         int recvIncrease = tokenIncrease;
2085         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2086                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2087         }
2088         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2089                 recvIncrease = maxRecvBuffers-context->srqSize;
2090         }
2091         node->infiData->postedRecvs+= recvIncrease;
2092         context->srqSize += recvIncrease;
2093         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2094         //increase the size of the recvCq
2095         int currentCqSize = context->recvCqSize;
2096         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2097                 CmiAssert(0);
2098         }
2099         context->recvCqSize += tokenIncrease;
2100         if(recvIncrease > 0){
2101                 //create another bufferPool and attach it to the top of the current one
2102                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2103                 newPool->next = context->recvBufferPool;
2104                 context->recvBufferPool = newPool;
2105                 postInitialRecvs(newPool,recvIncrease,packetSize);
2106         }
2107
2108 };
2109
2110
2111
2112
2113 /*********************************************
2114         Memory management routines for RDMA
2115
2116 ************************************************/
2117
2118 /**
2119         There are INFINUMPOOLS of memory.
2120         The first pool is of size firstBinSize.
2121         The ith pool is of size firstBinSize*2^i
2122 */
2123
2124 static void initInfiCmiChunkPools(){
2125         int i,j;
2126         int size = firstBinSize;
2127         int nodeSize;
2128
2129 #if THREAD_MULTI_POOL
2130         nodeSize = CmiMyNodeSize() + 1;
2131         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2132         for(i = 0; i < nodeSize; i++){
2133                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2134         }
2135         for(j = 0; j < nodeSize; j++){
2136                 size = firstBinSize;
2137                 for(i=0;i<INFINUMPOOLS;i++){
2138                         infiCmiChunkPools[j][i].size = size;
2139                         infiCmiChunkPools[j][i].startBuf = NULL;
2140                         infiCmiChunkPools[j][i].count = 0;
2141                         size *= 2;
2142                 }
2143         }
2144
2145         // creating the n^2 system of queues
2146         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2147         for(i = 0; i < nodeSize; i++){
2148                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2149         }
2150         for(i = 0; i < nodeSize; i++)
2151                 for(j = 0; j < nodeSize; j++)
2152                         queuePool[i][j] = PCQueueCreate();
2153
2154 #else
2155
2156         size = firstBinSize;    
2157         for(i=0;i<INFINUMPOOLS;i++){
2158                 infiCmiChunkPools[i].size = size;
2159                 infiCmiChunkPools[i].startBuf = NULL;
2160                 infiCmiChunkPools[i].count = 0;
2161                 size *= 2;
2162         }
2163 #endif
2164
2165 }
2166
2167 /***********
2168 Register memory for a part of a received multisend message
2169 *************/
2170 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2171         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2172         char *res=msg-sizeof(infiCmiChunkHeader);
2173         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2174         metaData->owner = NULL;
2175         metaData->poolIdx = INFIMULTIPOOL;
2176
2177         return metaData;
2178 };
2179
2180
2181 #if THREAD_MULTI_POOL
2182
2183 // Fills up the buffer pools for every thread in the node
2184 static inline void fillBufferPools(){
2185         int nodeSize, poolIdx, thread;
2186         infiCmiChunkMetaData *metaData;         
2187         infiCmiChunkHeader *hdr;
2188         int allocSize;
2189         int count=1;
2190         int i;
2191         struct ibv_mr *key;
2192         void *res;
2193
2194         // initializing values
2195         nodeSize = CmiMyNodeSize() + 1;
2196
2197         // iterating over all threads and all pools
2198         for(thread = 0; thread < nodeSize; thread++){
2199                 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2200                         allocSize = infiCmiChunkPools[thread][poolIdx].size;
2201                         if(poolIdx < blockThreshold){
2202                                 count = blockAllocRatio;
2203                         }else{
2204                                 count = 1;
2205                         }
2206                         res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2207                         hdr = res;
2208                         key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2209                         CmiAssert(key != NULL);
2210                         res += sizeof(infiCmiChunkHeader);
2211                         for(i=0;i<count;i++){
2212                                 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2213                                 metaData->key = key;
2214                                 metaData->owner = hdr;
2215                                 metaData->poolIdx = poolIdx;
2216                                 metaData->parentPe = thread;                                            // setting the parent PE
2217                                 if(i == 0){
2218                                         metaData->owner->metaData->count = count;
2219                                         metaData->nextBuf = NULL;
2220                                 }else{
2221                                         void *startBuf = res - sizeof(infiCmiChunkHeader);
2222                                         metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2223                                         infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2224                                         infiCmiChunkPools[thread][poolIdx].count++;
2225                                 }
2226                                 if(i != count-1){
2227                                         res += (allocSize+sizeof(infiCmiChunkHeader));
2228                                 }
2229                         }
2230                 }
2231         }       
2232 }
2233
2234 static inline void *getInfiCmiChunkThread(int dataSize){
2235         //find out to which pool this dataSize belongs to
2236         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2237         int ratio = dataSize/firstBinSize;
2238         int poolIdx=0;
2239         void *res;
2240         int i,j,nodeSize;
2241         void *pointer;
2242
2243         //printf("Hi\n");
2244         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2245         MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2246         
2247         while(ratio > 0){
2248                 ratio  = ratio >> 1;
2249                 poolIdx++;
2250         }
2251         MACHSTATE1(2,"This is %d",CmiMyRank());
2252         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2253
2254         // checking whether to analyze the free queues to reuse buffers 
2255         nodeSize = CmiMyNodeSize() + 1;
2256         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2257                 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2258                 for(i = 0; i < nodeSize; i++){
2259                         if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2260                                 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2261                                         pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2262                                         infi_CmiFreeDirect(pointer);    
2263                                 }
2264                         }
2265                 }       
2266         }
2267
2268         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2269                 infiCmiChunkMetaData *metaData;         
2270                 infiCmiChunkHeader *hdr;
2271                 int allocSize;
2272                 int count=1;
2273                 int i;
2274                 struct ibv_mr *key;
2275                 void *origres;
2276                 
2277                 
2278                 if(poolIdx < INFINUMPOOLS ){
2279                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2280                 }else{
2281                         allocSize = dataSize;
2282                 }
2283
2284                 if(poolIdx < blockThreshold){
2285                         count = blockAllocRatio;
2286                 }
2287                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2288                 hdr = res;
2289                 
2290                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2291                 CmiAssert(key != NULL);
2292                 
2293                 origres = (res += sizeof(infiCmiChunkHeader));
2294
2295                 for(i=0;i<count;i++){
2296                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2297                         metaData->key = key;
2298                         metaData->owner = hdr;
2299                         metaData->poolIdx = poolIdx;
2300                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2301
2302                         if(i == 0){
2303                                 metaData->owner->metaData->count = count;
2304                                 metaData->nextBuf = NULL;
2305                         }else{
2306                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2307                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2308                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2309                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2310                                 
2311                         }
2312                         if(i != count-1){
2313                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2314                         }
2315           }     
2316                 
2317                 
2318                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2319                 
2320                 return origres;
2321         }
2322         if(poolIdx < INFINUMPOOLS){
2323                 infiCmiChunkMetaData *metaData;                         
2324         
2325                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2326                 res += sizeof(infiCmiChunkHeader);
2327
2328                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2329                 metaData = METADATAFIELD(res);
2330
2331                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2332                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2333                 
2334                 metaData->nextBuf = NULL;
2335 //              CmiAssert(metaData->poolIdx == poolIdx);
2336
2337                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2338                 return res;
2339         }
2340
2341         CmiAssert(0);
2342
2343         
2344 };
2345 #else
2346 static inline void *getInfiCmiChunk(int dataSize){
2347         //find out to which pool this dataSize belongs to
2348         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2349         int ratio = dataSize/firstBinSize;
2350         int poolIdx=0;
2351         void *res;
2352
2353         while(ratio > 0){
2354                 ratio  = ratio >> 1;
2355                 poolIdx++;
2356         }
2357         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2358         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2359                 infiCmiChunkMetaData *metaData;
2360                 infiCmiChunkHeader *hdr;
2361                 int allocSize;
2362                 int count=1;
2363                 int i;
2364                 struct ibv_mr *key;
2365                 void *origres;
2366
2367
2368                 if(poolIdx < INFINUMPOOLS ){
2369                         allocSize = infiCmiChunkPools[poolIdx].size;
2370                 }else{
2371                         allocSize = dataSize;
2372                 }
2373
2374                 if(poolIdx < blockThreshold){
2375                         count = blockAllocRatio;
2376                 }
2377                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2378                 hdr = res;
2379
2380                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2381                 CmiAssert(key != NULL);
2382
2383                 origres = (res += sizeof(infiCmiChunkHeader));
2384
2385                 for(i=0;i<count;i++){
2386                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2387                         metaData->key = key;
2388                         metaData->owner = hdr;
2389                         metaData->poolIdx = poolIdx;
2390
2391                         if(i == 0){
2392                                 metaData->owner->metaData->count = count;
2393                                 metaData->nextBuf = NULL;
2394                         }else{
2395                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2396                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2397                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2398                                 infiCmiChunkPools[poolIdx].count++;
2399
2400                         }
2401                         if(i != count-1){
2402                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2403                         }
2404           }
2405
2406
2407                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2408
2409                 return origres;
2410         }
2411         if(poolIdx < INFINUMPOOLS){
2412                 infiCmiChunkMetaData *metaData;
2413
2414                 res = infiCmiChunkPools[poolIdx].startBuf;
2415                 res += sizeof(infiCmiChunkHeader);
2416
2417                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2418                 metaData = METADATAFIELD(res);
2419
2420                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2421                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2422
2423                 metaData->nextBuf = NULL;
2424 //              CmiAssert(metaData->poolIdx == poolIdx);
2425
2426                 infiCmiChunkPools[poolIdx].count--;
2427                 return res;
2428         }
2429
2430         CmiAssert(0);
2431
2432
2433 };
2434 #endif
2435
2436
2437 void * infi_CmiAlloc(int size){
2438         void *res;
2439
2440         //TODO: erase this
2441         _startTime = CmiWallTimer();    
2442
2443 #if THREAD_MULTI_POOL
2444         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2445         res -= sizeof(CmiChunkHeader);
2446
2447         //TODO: erase this      
2448         processBufferedTime += CmiWallTimer() - _startTime;
2449         processBufferedCount++;
2450
2451         return res;
2452 #else
2453 #if CMK_SMP
2454         _auxTime = CmiWallTimer();
2455         CmiMemLock();
2456         processLockTime += CmiWallTimer() - _auxTime;
2457 #endif
2458 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2459                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2460
2461                 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));     
2462                 res -= sizeof(CmiChunkHeader);
2463 #if CMK_SMP     
2464         _auxTime = CmiWallTimer();
2465         CmiMemUnlock();
2466         processLockTime += CmiWallTimer() - _auxTime;
2467 #endif
2468 /*      }else{
2469                 res = malloc(size);
2470         }*/
2471         
2472         //TODO: erase this      
2473         processBufferedTime += CmiWallTimer() - _startTime;
2474         processBufferedCount++;
2475
2476         return res;
2477 #endif
2478 }
2479
2480 #if THREAD_MULTI_POOL
2481 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2482 void infi_CmiFreeDirect(void *ptr){
2483         int size;
2484         int parentPe;
2485         void *freePtr = ptr;
2486
2487         //ptr += sizeof(CmiChunkHeader);
2488         size = SIZEFIELD (ptr);
2489 /*      if(size > firstBinSize){*/
2490         infiCmiChunkMetaData *metaData;
2491         int poolIdx;
2492         //there is a infiniband specific header
2493         freePtr = ptr - sizeof(infiCmiChunkHeader);
2494         metaData = METADATAFIELD(ptr);
2495         poolIdx = metaData->poolIdx;
2496
2497         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2498 //      CmiAssert(poolIdx >= 0);
2499         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].count <= INFIMAXPERPOOL){
2500                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2501                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = freePtr;
2502                         infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2503
2504                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf,infiCmiChunkPools[CmiMyRank()][poolIdx].count);
2505                 }else{
2506                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2507                         metaData->owner->metaData->count--;
2508                         if(metaData->owner->metaData == metaData){
2509                                 //I am the owner
2510                                 if(metaData->owner->metaData->count == 0){
2511                                         //all the chunks have been freed
2512                                         ibv_dereg_mr(metaData->key);
2513                                         free(freePtr);
2514                                         free(metaData);
2515                                 }
2516                                 //if I am the owner and all the chunks have not been
2517                                 // freed dont free my metaData. will need later
2518                         }else{
2519                                 if(metaData->owner->metaData->count == 0){
2520                                         //need to free the owner's buffer and metadata
2521                                         freePtr = metaData->owner;
2522                                         ibv_dereg_mr(metaData->key);
2523                                         free(metaData->owner->metaData);
2524                                         free(freePtr);
2525                                 }
2526                                 free(metaData);
2527                         }
2528                 }
2529 }
2530
2531
2532 void infi_CmiFree(void *ptr){
2533         int i,j;
2534         int size;
2535         int parentPe;
2536         int nodeSize;
2537         void *pointer;
2538         void *freePtr = ptr;
2539         nodeSize = CmiMyNodeSize() + 1;
2540
2541         MACHSTATE(3,"Freeing");
2542
2543         ptr += sizeof(CmiChunkHeader);
2544         size = SIZEFIELD (ptr);
2545 /*      if(size > firstBinSize){*/
2546         infiCmiChunkMetaData *metaData;
2547         int poolIdx;
2548         //there is a infiniband specific header
2549         freePtr = ptr - sizeof(infiCmiChunkHeader);
2550         metaData = METADATAFIELD(ptr);
2551         poolIdx = metaData->poolIdx;
2552
2553         if(poolIdx == INFIMULTIPOOL){
2554                 /** this is a part of a received mult message  
2555                     it will be freed correctly later
2556                 **/
2557
2558                 return;
2559         }
2560
2561
2562         //TODO: erase this
2563         TESTfrees++;
2564
2565         // checking if this free operation is my responsibility
2566         parentPe = metaData->parentPe;
2567         if(parentPe != CmiMyRank()){
2568                 TESTneighbor++;
2569                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2570                 return;
2571         }
2572
2573
2574         infi_CmiFreeDirect(ptr);
2575
2576 // TODO: erase following code
2577 /*      // checking free request queues
2578         for(i = 0; i < nodeSize; i++){
2579                 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2580                         for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2581                                 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2582                                 infi_CmiFreeDirect(pointer);    
2583                         }
2584                 }
2585         }
2586 */
2587
2588 }
2589
2590 #else
2591 void infi_CmiFree(void *ptr){
2592         int size;
2593         void *freePtr = ptr;
2594         
2595 #if CMK_SMP     
2596         CmiMemLock();
2597 #endif
2598         ptr += sizeof(CmiChunkHeader);
2599         size = SIZEFIELD (ptr);
2600 /*      if(size > firstBinSize){*/
2601                 infiCmiChunkMetaData *metaData;
2602                 int poolIdx;
2603                 //there is a infiniband specific header
2604                 freePtr = ptr - sizeof(infiCmiChunkHeader);
2605                 metaData = METADATAFIELD(ptr);
2606                 poolIdx = metaData->poolIdx;
2607                 if(poolIdx == INFIMULTIPOOL){
2608                         /** this is a part of a received mult message  
2609                         it will be freed correctly later
2610                         **/
2611                         
2612                         return;
2613                 }
2614                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2615 //              CmiAssert(poolIdx >= 0);
2616                 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
2617                         metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2618                         infiCmiChunkPools[poolIdx].startBuf = freePtr;
2619                         infiCmiChunkPools[poolIdx].count++;
2620                         
2621                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2622                 }else{
2623                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2624                         metaData->owner->metaData->count--;
2625                         if(metaData->owner->metaData == metaData){
2626                                 //I am the owner
2627                                 if(metaData->owner->metaData->count == 0){
2628                                         //all the chunks have been freed
2629                                         ibv_dereg_mr(metaData->key);
2630                                         free(freePtr);
2631                                         free(metaData);
2632                                 }
2633                                 //if I am the owner and all the chunks have not been
2634                                 // freed dont free my metaData. will need later
2635                         }else{
2636                                 if(metaData->owner->metaData->count == 0){
2637                                         //need to free the owner's buffer and metadata
2638                                         freePtr = metaData->owner;
2639                                         ibv_dereg_mr(metaData->key);
2640                                         free(metaData->owner->metaData);
2641                                         free(freePtr);
2642                                 }
2643                                 free(metaData);
2644                         }
2645                 }       
2646 #if CMK_SMP     
2647         CmiMemUnlock();
2648 #endif
2649 /*      }else{
2650                 free(freePtr);
2651         }*/
2652 }
2653 #endif
2654
2655 /*********************************************************************************************
2656 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2657 the user can transfer data between processors without using Charm++ messages. This lets the user
2658 send and receive data from the middle of his arrays without any copying on either send or receive
2659 side
2660 *********************************************************************************************/
2661
2662 struct infiDirectRequestPacket{
2663         int senderProc;
2664         int handle;
2665         struct ibv_mr senderKey;
2666         void *senderBuf;
2667         int senderBufSize;
2668 };
2669
2670 #include "cmidirect.h"
2671
2672 #define MAXHANDLES 512
2673
2674 struct infiDirectHandleStruct;
2675
2676
2677 typedef struct directPollingQNodeStruct {
2678         struct infiDirectHandleStruct *handle;
2679         struct directPollingQNodeStruct *next;
2680         double *lastDouble;
2681 } directPollingQNode;
2682
2683 typedef struct infiDirectHandleStruct{
2684         int id;
2685         void *buf;
2686         int size;
2687         struct ibv_mr *key;
2688         void (*callbackFnPtr)(void *);
2689         void *callbackData;
2690 //      struct infiDirectRequestPacket *packet;
2691         struct infiDirectUserHandle userHandle;
2692         struct infiRdmaPacket *rdmaPacket;
2693         directPollingQNode pollingQNode;
2694 }       infiDirectHandle;
2695
2696 typedef struct infiDirectHandleTableStruct{
2697         infiDirectHandle handles[MAXHANDLES];
2698         struct infiDirectHandleTableStruct *next;
2699 } infiDirectHandleTable;
2700
2701
2702 // data structures 
2703
2704 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2705
2706 static infiDirectHandleTable **sendHandleTable=NULL;
2707 static infiDirectHandleTable **recvHandleTable=NULL;
2708
2709 static int *recvHandleCount=NULL;
2710
2711 void addHandleToPollingQ(infiDirectHandle *handle){
2712 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2713         directPollingQNode *newNode = &(handle->pollingQNode);
2714         newNode->handle = handle;
2715         newNode->next = NULL;
2716         if(headDirectPollingQ==NULL){
2717                 /*empty pollingQ*/
2718                 headDirectPollingQ = newNode;
2719                 tailDirectPollingQ = newNode;
2720         }else{
2721                 tailDirectPollingQ->next = newNode;
2722                 tailDirectPollingQ = newNode;
2723         }
2724 };
2725 /*
2726 infiDirectHandle *removeHandleFromPollingQ(){
2727         if(headDirectPollingQ == NULL){
2728                 //polling Q is empty
2729                 return NULL;
2730         }
2731         directPollingQNode *retNode = headDirectPollingQ;
2732         if(headDirectPollingQ == tailDirectPollingQ){
2733                 //PollingQ has one node 
2734                 headDirectPollingQ = tailDirectPollingQ = NULL;
2735         }else{
2736                 headDirectPollingQ = headDirectPollingQ->next;
2737         }
2738         infiDirectHandle *retHandle = retNode->handle;
2739         free(retNode);
2740         return retHandle;
2741 }*/
2742
2743 static inline infiDirectHandleTable **createHandleTable(){
2744         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2745         int i;
2746         for(i=0;i<_Cmi_numnodes;i++){
2747                 table[i] = NULL;
2748         }
2749         return table;
2750 }
2751
2752 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2753         *tableIdx = handle/MAXHANDLES;
2754         *idx = handle%MAXHANDLES;
2755 };
2756
2757 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2758 {
2759         /** initialize the last double in the buffer to bufize***/
2760         int index = recvBufSize - sizeof(double);
2761         double *lastDouble = (double *)(((char *)recvBuf)+index);
2762         *lastDouble = initialValue;
2763 }
2764
2765
2766 /**
2767  To be called on the receiver to create a handle and return its number
2768 **/
2769 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2770         int newHandle;
2771         int tableIdx,idx;
2772         int i;
2773         infiDirectHandleTable *table;
2774         struct infiDirectUserHandle userHandle;
2775         
2776         CmiAssert(recvBufSize > sizeof(double));
2777         
2778         if(recvHandleTable == NULL){
2779                 recvHandleTable = createHandleTable();
2780                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2781                 for(i=0;i<_Cmi_numnodes;i++){
2782                         recvHandleCount[i] = -1;
2783                 }
2784         }
2785         if(recvHandleTable[senderNode] == NULL){
2786                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2787                 recvHandleTable[senderNode]->next = NULL;               
2788         }
2789         
2790         newHandle = ++recvHandleCount[senderNode];
2791         CmiAssert(newHandle >= 0);
2792         
2793         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2794         
2795         table = recvHandleTable[senderNode];
2796         for(i=0;i<tableIdx;i++){
2797                 if(table->next ==NULL){
2798                         table->next = malloc(sizeof(infiDirectHandleTable));
2799                         table->next->next = NULL;
2800                 }
2801                 table = table->next;
2802         }
2803         table->handles[idx].id = newHandle;
2804         table->handles[idx].buf = recvBuf;
2805         table->handles[idx].size = recvBufSize;
2806 #if CMI_DIRECT_DEBUG
2807         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2808 #endif
2809         table->handles[idx].callbackFnPtr = callbackFnPtr;
2810         table->handles[idx].callbackData = callbackData;
2811         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2812         CmiAssert(table->handles[idx].key != NULL);
2813
2814 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2815         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2816         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2817         
2818         userHandle.handle = newHandle;
2819         userHandle.recverNode = _Cmi_mynode;
2820         userHandle.senderNode = senderNode;
2821         userHandle.recverBuf = recvBuf;
2822         userHandle.recverBufSize = recvBufSize;
2823         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2824         userHandle.initialValue = initialValue;
2825         
2826         table->handles[idx].userHandle = userHandle;
2827         
2828         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2829
2830   {
2831          int index = table->handles[idx].size - sizeof(double);
2832    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2833         } 
2834         
2835         addHandleToPollingQ(&(table->handles[idx]));
2836         
2837 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2838         
2839         return userHandle;
2840 }
2841
2842 /****
2843  To be called on the sender to attach the sender's buffer to this handle
2844 ******/
2845 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2846         int tableIdx,idx;
2847         int i;
2848         int handle = userHandle->handle;
2849         int recverNode  = userHandle->recverNode;
2850
2851         infiDirectHandleTable *table;
2852
2853         if(sendHandleTable == NULL){
2854                 sendHandleTable = createHandleTable();
2855         }
2856         if(sendHandleTable[recverNode] == NULL){
2857                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2858                 sendHandleTable[recverNode]->next = NULL;
2859         }
2860         
2861         CmiAssert(handle >= 0);
2862         calcHandleTableIdx(handle,&tableIdx,&idx);
2863         
2864         table = sendHandleTable[recverNode];
2865         for(i=0;i<tableIdx;i++){
2866                 if(table->next ==NULL){
2867                         table->next = malloc(sizeof(infiDirectHandleTable));
2868                         table->next->next = NULL;
2869                 }
2870                 table = table->next;
2871         }
2872
2873         table->handles[idx].id = handle;
2874         table->handles[idx].buf = sendBuf;
2875
2876         table->handles[idx].size = sendBufSize;
2877 #if CMI_DIRECT_DEBUG
2878         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2879 #endif
2880         table->handles[idx].callbackFnPtr = table->handles[idx].callbackData = NULL;
2881         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2882         CmiAssert(table->handles[idx].key != NULL);
2883         table->handles[idx].userHandle = *userHandle;
2884         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2885         
2886         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2887         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2888         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2889         
2890         
2891 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2892         table->handles[idx].packet->senderProc = _Cmi_mynode;
2893         table->handles[idx].packet->handle = handle;
2894         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2895         table->handles[idx].packet->senderBuf = sendBuf;
2896         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2897         
2898         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2899 };
2900
2901
2902
2903
2904
2905 /****
2906 To be called on the sender to do the actual data transfer
2907 ******/
2908 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2909         int handle = userHandle->handle;
2910         int recverNode = userHandle->recverNode;
2911         if(recverNode == _Cmi_mynode){
2912                 /*when the sender and receiver are on the same
2913                 processor, just look up the sender and receiver
2914                 buffers and do a memcpy*/
2915
2916                 infiDirectHandleTable *senderTable;
2917                 infiDirectHandleTable *recverTable;
2918                 
2919                 int tableIdx,idx,i;
2920
2921                 
2922                 /*find entry for this handle in sender table*/
2923                 calcHandleTableIdx(handle,&tableIdx,&idx);
2924                 CmiAssert(sendHandleTable!= NULL);
2925                 senderTable = sendHandleTable[_Cmi_mynode];
2926                 CmiAssert(senderTable != NULL);
2927                 for(i=0;i<tableIdx;i++){
2928                         senderTable = senderTable->next;
2929                 }
2930
2931                 /**find entry for this handle in recver table*/
2932                 recverTable = recvHandleTable[recverNode];
2933                 CmiAssert(recverTable != NULL);
2934                 for(i=0;i< tableIdx;i++){
2935                         recverTable = recverTable->next;
2936                 }
2937                 
2938                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
2939                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
2940 #if CMI_DIRECT_DEBUG
2941                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
2942 #endif
2943                 // The polling Q should find you and handle the callback and pollingq entry
2944                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
2945                 
2946
2947         }else{
2948                 infiPacket packet;
2949                 int tableIdx,idx;
2950                 int i;
2951                 OtherNode node;
2952                 infiDirectHandleTable *table;
2953
2954                 calcHandleTableIdx(handle,&tableIdx,&idx);
2955
2956                 table = sendHandleTable[recverNode];
2957                 CmiAssert(table != NULL);
2958                 for(i=0;i<tableIdx;i++){
2959                         table = table->next;
2960                 }
2961
2962 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
2963 #if CMI_DIRECT_DEBUG
2964                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
2965 #endif
2966
2967                 
2968                 {
2969                         
2970                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
2971                         struct ibv_sge list = {
2972                                 .addr = (uintptr_t )table->handles[idx].buf,
2973                                 .length = table->handles[idx].size,
2974                                 .lkey   = table->handles[idx].key->lkey
2975                         };
2976                         
2977                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
2978
2979                         struct ibv_send_wr *bad_wr;
2980                         struct ibv_send_wr wr = {
2981                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
2982                                 .sg_list = &list,
2983                                 .num_sge = 1,
2984                                 .opcode = IBV_WR_RDMA_WRITE,
2985                                 .send_flags = IBV_SEND_SIGNALED,
2986                                 
2987                                 .wr.rdma = {
2988                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
2989                                         .rkey = remoteKey->rkey
2990                                 }
2991                         };
2992                         /** post and rdma_read that is a rdma get*/
2993                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
2994                                 CmiAssert(0);
2995                         }
2996                 }
2997
2998         /*      MallocInfiPacket (packet);
2999                 {
3000                         packet->size = sizeof(struct infiDirectRequestPacket);
3001                         packet->buf = (char *)(table->handles[idx].packet);
3002                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3003                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3004                 }*/
3005         }
3006
3007 };
3008
3009 /**** need not be called the first time *********/
3010 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3011   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3012 }
3013
3014 /**** need not be called the first time *********/
3015 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3016         int handle = userHandle->handle;
3017         int tableIdx,idx,i;
3018         infiDirectHandleTable *table;
3019         calcHandleTableIdx(handle,&tableIdx,&idx);
3020         
3021         table = recvHandleTable[userHandle->senderNode];
3022         CmiAssert(table != NULL);
3023         for(i=0;i<tableIdx;i++){
3024                 table = table->next;
3025         }
3026 #if CMI_DIRECT_DEBUG
3027   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3028 #endif  
3029         addHandleToPollingQ(&(table->handles[idx]));
3030         
3031
3032 }
3033
3034 /**** need not be called the first time *********/
3035 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3036         int handle = userHandle->handle;
3037         int tableIdx,idx,i;
3038         infiDirectHandleTable *table;
3039         
3040         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3041
3042         calcHandleTableIdx(handle,&tableIdx,&idx);
3043         
3044         table = recvHandleTable[userHandle->senderNode];
3045         CmiAssert(table != NULL);
3046         for(i=0;i<tableIdx;i++){
3047                 table = table->next;
3048         }
3049 #if CMI_DIRECT_DEBUG
3050   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3051 #endif  
3052         addHandleToPollingQ(&(table->handles[idx]));
3053         
3054 }
3055
3056
3057 static int receivedDirectMessage(infiDirectHandle *handle){
3058 //      int index = handle->size - sizeof(double);
3059 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
3060         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3061                 return 0;
3062         }else{
3063                 (*(handle->callbackFnPtr))(handle->callbackData);       
3064                 return 1;
3065         }
3066         
3067 }
3068
3069
3070 static void pollCmiDirectQ(){
3071         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3072         while(ptr != NULL){
3073                 if(receivedDirectMessage(ptr->handle)){
3074 #if CMI_DIRECT_DEBUG
3075       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
3076 #endif
3077                         directPollingQNode *delPtr = ptr;
3078                         /** has been received and delete this node***/
3079                         if(prevPtr == NULL){
3080                                 /** first in the pollingQ**/
3081                                 if(headDirectPollingQ == tailDirectPollingQ){
3082                                         /**only node in pollingQ****/
3083                                         headDirectPollingQ = tailDirectPollingQ = NULL;
3084                                 }else{
3085                                         headDirectPollingQ = headDirectPollingQ->next;
3086                                 }
3087                         }else{
3088                                 if(ptr == tailDirectPollingQ){
3089                                         /**last node is being deleted**/
3090                                         tailDirectPollingQ = prevPtr;
3091                                 }
3092                                 prevPtr->next = ptr->next;
3093                         }
3094                         ptr = ptr->next;
3095                 //      free(delPtr);
3096                 }else{
3097                         prevPtr = ptr;
3098                         ptr = ptr->next;
3099                 }
3100         }
3101 }
3102
3103
3104 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3105         int senderProc = directRequestPacket->senderProc;
3106         int handle = directRequestPacket->handle;
3107         int tableIdx,idx,i;
3108         infiDirectHandleTable *table;
3109         OtherNode node = nodes_by_pe[senderProc];
3110
3111         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3112
3113         calcHandleTableIdx(handle,&tableIdx,&idx);
3114
3115         table = recvHandleTable[senderProc];
3116         CmiAssert(table != NULL);
3117         for(i=0;i<tableIdx;i++){
3118                 table = table->next;
3119         }
3120         
3121         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3122         
3123         {
3124                 struct ibv_sge list = {
3125                         .addr = (uintptr_t )table->handles[idx].buf,
3126                         .length = table->handles[idx].size,
3127                         .lkey   = table->handles[idx].key->lkey
3128                 };
3129
3130                 struct ibv_send_wr *bad_wr;
3131                 struct ibv_send_wr wr = {
3132                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3133                         .sg_list = &list,
3134                         .num_sge = 1,
3135                         .opcode = IBV_WR_RDMA_READ,
3136                         .send_flags = IBV_SEND_SIGNALED,
3137                         .wr.rdma = {
3138                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3139                                 .rkey = directRequestPacket->senderKey.rkey
3140                         }
3141                 };
3142 //       post and rdma_read that is a rdma get
3143                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3144                         CmiAssert(0);
3145                 }
3146         }
3147                         
3148         
3149 };*/
3150 /*
3151 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3152         MACHSTATE(3,"processDirectWC");
3153         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3154         (*(handle->callbackFnPtr))(handle->callbackData);
3155 };
3156 */
3157
3158 static void sendBarrierMessage(int pe)
3159 {
3160   /* we will only need one packet */
3161   int size=32;
3162   OtherNode  node = nodes + pe;
3163   infiPacket packet;
3164   MallocInfiPacket(packet);
3165   packet->size = size;
3166   packet->buf = CmiAlloc(size);
3167   packet->header.code = INFIBARRIERPACKET;
3168   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3169   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3170   /*  pollSendCq(0);*/
3171   EnqueuePacket(node,packet,size,key);
3172 }
3173
3174 static void recvBarrierMessage()
3175 {
3176   int i;
3177   int ne;
3178   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3179   struct ibv_wc wc[1];
3180   struct ibv_wc *recvWC;
3181   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3182   int toBuffer=1; // buffer without processing recvd messages
3183   int barrierReached=0;
3184   struct infiBuffer *buffer = NULL;
3185   struct infiPacketHeader *header = NULL;
3186   int nodeNo=-1;
3187   int len=-1;
3188   while(!barrierReached)
3189     {
3190       /* gengbin's semantic will implode if more than one q is polled at a time */
3191       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3192       //        CmiAssert(ne >=0);
3193       if(ne != 0){
3194         MACHSTATE1(3,"recvBarrier ne %d",ne);
3195       }
3196       pollSendCq(1); 
3197       for(i=0;i<ne;i++){
3198         if(wc[i].status != IBV_WC_SUCCESS){
3199           CmiAssert(0);
3200         }
3201         switch(wc[i].opcode){
3202         case IBV_WC_RECV: /* we have something to consider*/
3203           recvWC=&wc[i];
3204           buffer = (struct infiBuffer *) recvWC->wr_id; 
3205           header = (struct infiPacketHeader *)buffer->buf;
3206           nodeNo = header->nodeNo;
3207           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3208
3209           if(header->code & INFIPACKETCODE_DATA){
3210             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3211           }
3212           if(header->code & INFIDUMMYPACKET){
3213             MACHSTATE(3,"Dummy packet");
3214           }
3215           if(header->code & INFIBARRIERPACKET){
3216             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3217             // now we are done
3218             barrierReached=1;
3219             /* semantically questionable */
3220             //processAllBufferedMsgs();
3221             //return;
3222           }
3223           if(rdma && header->code & INFIRDMA_START){
3224             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3225             //          if(toBuffer){
3226             //TODO: make a function of this and use for both acks and requests
3227             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3228             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3229             *copyPacket = *rdmaPacket;
3230             copyPacket->fromNodeNo = nodeNo;
3231             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3232             context->bufferedRdmaRequests = copyPacket;
3233             copyPacket->next = tmp;
3234             copyPacket->prev = NULL;
3235             if(tmp != NULL){
3236               tmp->prev = copyPacket;
3237             }
3238             /*          }else{
3239                         processRdmaRequest(rdmaPacket,nodeNo,0);
3240                         }*/
3241           }
3242           if(rdma && header->code & INFIRDMA_ACK){
3243             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3244             processRdmaAck(rdmaPacket);
3245           }
3246           {
3247             struct ibv_sge list = {
3248               .addr     = (uintptr_t) buffer->buf,
3249               .length = buffer->size,
3250               .lkey     = buffer->key->lkey
3251             };
3252         
3253             struct ibv_recv_wr wr = {
3254               .wr_id = (uint64_t)buffer,