f0c40be173608912b5298988265ba141639a4a6a
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * Ibverbs (infiniband)  implementation of Converse NET version
4  * @ingroup NET
5  * contains only Ibverbs specific code for:
6  * - CmiMachineInit()
7  * - CmiCommunicationInit()
8  * - CmiNotifyStillIdle()
9  * - DeliverViaNetwork()
10  * - CommunicationServer()
11  * - CmiMachineExit()
12
13   created by 
14         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
15 */
16
17 /**
18  * @addtogroup NET
19  * @{
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <malloc.h>
28 #include <getopt.h>
29 #include <time.h>
30
31 #include <infiniband/verbs.h>
32
33 enum ibv_mtu mtu = IBV_MTU_2048;
34 static int page_size;
35 static int mtu_size;
36 static int packetSize;
37 static int dataSize;
38 static int rdma;
39 static int rdmaThreshold;
40 static int firstBinSize;
41 static int blockAllocRatio;
42 static int blockThreshold;
43
44
45 static int maxRecvBuffers;
46 static int maxTokens;
47 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
48 static int sendPacketPoolSize; /*total number of send buffers created*/
49
50 static double _startTime=0;
51 static int regCount;
52
53 static int pktCount;
54 static int msgCount;
55 static int minTokensLeft;
56
57
58 static double regTime;
59
60 //TODO: erase this
61 static double processLockTime;
62 static double _auxTime;
63
64 static double processBufferedTime;
65 static int processBufferedCount;
66
67 #define CMK_IBVERBS_STATS 0
68 #define CMK_IBVERBS_TOKENS_FLOW 1
69 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
70 #define CMK_IBVERBS_DEBUG 0
71 #define CMI_DIRECT_DEBUG 0
72 #define WC_LIST_SIZE 32
73 /*#define WC_BUFFER_SIZE 100*/
74
75
76
77
78 #define INCTOKENS_FRACTION 0.04
79 #define INCTOKENS_INCREASE .50
80
81 // flag for using a pool for every thread
82 #define THREAD_MULTI_POOL 0
83
84 #if THREAD_MULTI_POOL 
85 #include "pcqueue.h"
86 PCQueue **queuePool;
87 void infi_CmiFreeDirect(void *ptr);
88 static inline void fillBufferPools();
89 #endif
90
91 #define INFIBARRIERPACKET 128
92
93 struct infiIncTokenAckPacket{
94         int a;
95 };
96
97 typedef struct {
98 char none;  
99 } CmiIdleState;
100
101 //TODO:ERASE this
102 static int TESTneighbor;
103 static int TESTfrees;
104
105
106 /********
107 ** The notify idle methods
108 ***/
109
110 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
111
112 static void CmiNotifyStillIdle(CmiIdleState *s);
113
114
115 static void CmiNotifyBeginIdle(CmiIdleState *s)
116 {
117   CmiNotifyStillIdle(s);
118 }
119
120 void CmiNotifyIdle(void) {
121   CmiNotifyStillIdle(NULL);
122 }
123
124 /***************
125 Data Structures 
126 ***********************/
127
128 /******
129         This is a header attached to the beginning of every infiniband packet
130 *******/
131 #define INFIPACKETCODE_DATA 1
132 #define INFIPACKETCODE_INCTOKENS 2
133 #define INFIRDMA_START 4
134 #define INFIRDMA_ACK 8
135 #define INFIDIRECT_REQUEST 16
136 #define INFIPACKETCODE_INCTOKENSACK 32
137 #define INFIDUMMYPACKET 64
138
139 struct infiPacketHeader{
140         char code;
141         int nodeNo;
142 #if     CMK_IBVERBS_DEBUG
143         int psn;
144 #endif  
145 };
146
147 /*
148         Types of rdma packets
149 */
150 #define INFI_MESG 1 
151 #define INFI_DIRECT 2
152
153 struct infiRdmaPacket{
154         int fromNodeNo;
155         int type;
156         struct ibv_mr key;
157         struct ibv_mr *keyPtr;
158         int remoteSize;
159         char *remoteBuf;
160         void *localBuffer;
161         OutgoingMsg ogm;
162         struct infiRdmaPacket *next,*prev;
163 };
164
165
166 /** Represents a buffer that is used to receive messages
167 */
168 #define BUFFER_RECV 1
169 #define BUFFER_RDMA 2
170 struct infiBuffer{
171         int type;
172         char *buf;
173         int size;
174         struct ibv_mr *key;
175 };
176
177
178
179 /** At the moment it is a simple pool with just a list of buffers
180         * TODO; extend it to make it an element in a linklist of pools
181 */
182 struct infiBufferPool{
183         int numBuffers;
184         struct infiBuffer *buffers;
185         struct infiBufferPool *next;
186 };
187
188 /*****
189         It is the structure for the send buffers that are used
190         to send messages to other nodes
191 ********/
192
193
194 typedef struct infiPacketStruct {       
195         char *buf;
196         int size;
197         struct infiPacketHeader header;
198         struct ibv_mr *keyHeader;
199         struct OtherNodeStruct *destNode;
200         struct infiPacketStruct *next;
201         OutgoingMsg ogm;
202         struct ibv_sge elemList[2];
203         struct ibv_send_wr wr;
204 }* infiPacket;
205
206 /*
207 typedef struct infiBufferedWCStruct{
208         struct ibv_wc wcList[WC_BUFFER_SIZE];
209         int count;
210         struct infiBufferedWCStruct *next,*prev;
211 } * infiBufferedWC;
212 */
213
214 #define BCASTLIST_SIZE 50
215
216 struct infiBufferedBcastStruct{
217         char *msg;
218         int size;
219         int broot;
220         int asm_rank;
221         int valid;
222 };
223
224 typedef struct infiBufferedBcastPoolStruct{
225         struct infiBufferedBcastPoolStruct *next,*prev;
226         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
227         int count;
228
229 } *infiBufferedBcastPool;
230
231
232
233
234 /***
235         This structure represents the data needed by the infiniband
236         communication routines of a node
237         TODO: add locking for the smp version
238 */
239 struct infiContext {
240         struct ibv_context      *context;
241         
242         fd_set  asyncFds;
243         struct timeval tmo;
244         
245         int ibPort;
246 //      struct ibv_comp_channel *channel;
247         struct ibv_pd           *pd;
248         struct ibv_cq           *sendCq;
249         struct ibv_cq   *recvCq;
250         struct ibv_srq  *srq;
251         
252         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
253                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
254                                                                                                 //when the qps are stored in the corresponding OtherNodes
255
256         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
257
258         infiPacket infiPacketFreeList; 
259         
260         struct infiBufferPool *recvBufferPool;
261
262         struct infiPacketHeader header;
263
264         int srqSize;
265         int sendCqSize,recvCqSize;
266         int tokensLeft;
267
268         infiBufferedBcastPool bufferedBcastList;
269         
270         struct infiRdmaPacket *bufferedRdmaAcks;
271         
272         struct infiRdmaPacket *bufferedRdmaRequests;
273 /*      infiBufferedWC infiBufferedRecvList;*/
274         
275         int insideProcessBufferedBcasts;
276 };
277
278 static struct infiContext *context;
279
280
281
282
283 /** Represents a qp used to send messages to another node
284  There is one for each remote node
285 */
286 struct infiAddr {
287         int lid,qpn,psn;
288 };
289
290 /**
291  Stored in the OtherNode structure in machine-dgram.c 
292  Store the per node data for ibverbs layer
293 */
294 enum { INFI_HEADER_DATA=21,INFI_DATA};
295
296 struct infiOtherNodeData{
297         struct ibv_qp *qp ;
298         int state;// does it expect a packet with a header (first packet) or one without
299         int totalTokens;
300         int tokensLeft;
301         int nodeNo;
302         
303         int postedRecvs;
304         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
305 #if     CMK_IBVERBS_DEBUG       
306         int psn;
307         int recvPsn;
308 #endif  
309 };
310
311
312 /********************************
313 Memory management structures and types
314 *****************/
315
316 struct infiCmiChunkHeaderStruct;
317
318 typedef struct infiCmiChunkMetaDataStruct {
319         struct ibv_mr *key;
320         int poolIdx;
321         void *nextBuf;
322         struct infiCmiChunkHeaderStruct *owner;
323         int count;
324
325 #if THREAD_MULTI_POOL
326         int parentPe;                                           // the PE that allocated the buffer and must release it
327 #endif
328 } infiCmiChunkMetaData;
329
330
331
332
333 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
334
335 typedef struct {
336         int size;//without infiCmiChunkHeader
337         void *startBuf;
338         int count;
339 } infiCmiChunkPool;
340
341 #define INFINUMPOOLS 20
342 #define INFIMAXPERPOOL 100
343 #define INFIMULTIPOOL -5
344
345 #if THREAD_MULTI_POOL
346 infiCmiChunkPool **infiCmiChunkPools;
347 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
348 #else
349 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
350 #endif
351
352 static void initInfiCmiChunkPools();
353
354
355 static inline infiPacket newPacket(){
356         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
357         pkt->size = -1;
358         pkt->header = context->header;
359         pkt->next = NULL;
360         pkt->destNode = NULL;
361         pkt->keyHeader = METADATAFIELD(pkt)->key;
362         pkt->ogm=NULL;
363         CmiAssert(pkt->keyHeader!=NULL);
364         
365         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
366         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
367         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
368         
369         pkt->wr.wr_id = (uint64_t)pkt;
370         pkt->wr.sg_list = &(pkt->elemList[0]);
371         pkt->wr.num_sge = 2;
372         pkt->wr.opcode = IBV_WR_SEND;
373         pkt->wr.send_flags = IBV_SEND_SIGNALED;
374         pkt->wr.next = NULL;
375         
376         return pkt;
377 };
378
379 #define FreeInfiPacket(pkt){ \
380         pkt->size = -1;\
381         pkt->ogm=NULL;\
382         pkt->next = context->infiPacketFreeList; \
383         context->infiPacketFreeList = pkt; \
384 }
385
386 #define MallocInfiPacket(pkt) { \
387         infiPacket p = context->infiPacketFreeList; \
388         if(p == NULL){ p = newPacket();} \
389                  else{context->infiPacketFreeList = p->next; } \
390         pkt = p;\
391 }
392
393
394
395
396 /******************CmiMachineInit and its helper functions*/
397 static inline int pollSendCq(const int toBuffer);
398
399 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
400 static uint16_t getLocalLid(struct ibv_context *context, int port);
401 static int  checkQp(struct ibv_qp *qp){
402         struct ibv_qp_attr attr;
403         struct ibv_qp_init_attr init_attr;
404                  
405         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
406         if(attr.cur_qp_state != IBV_QPS_RTS){
407                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
408                 return 0;
409         }
410         return 1;
411 }
412 static void checkAllQps(){
413         int i;
414         for(i=0;i<_Cmi_numnodes;i++){
415                 if(i != _Cmi_mynode){
416                         if(!checkQp(nodes[i].infiData->qp)){
417                                 pollSendCq(0);
418                                 CmiAssert(0);
419                         }
420                 }
421         }
422 }
423
424 #if CMK_IBVERBS_FAST_START
425 static void send_partial_init();
426 #endif
427
428 static void CmiMachineInit(char **argv){
429         struct ibv_device **devList;
430         struct ibv_device *dev;
431         int ibPort;
432         int i;
433         int calcMaxSize;
434         infiPacket *pktPtrs;
435         struct infiRdmaPacket **rdmaPktPtrs;
436
437         MACHSTATE(3,"CmiMachineInit {");
438         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
439         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
440         
441         //TODO: make the device and ibport configureable by commandline parameter
442         //Check example for how to do that
443         devList =  ibv_get_device_list(NULL);
444         CmiAssert(devList != NULL);
445
446         dev = *devList;
447         CmiAssert(dev != NULL);
448
449         ibPort=1;
450
451         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
452
453         context = (struct infiContext *)malloc(sizeof(struct infiContext));
454         
455         MACHSTATE1(3,"context allocated %p",context);
456         
457         //localAddr will store the local addresses of all the qps
458         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
459         
460         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
461         
462         context->ibPort = ibPort;
463         //the context for this infiniband device 
464         context->context = ibv_open_device(dev);
465         CmiAssert(context->context != NULL);
466         
467         MACHSTATE1(3,"device opened %p",context->context);
468
469 /*      FD_ZERO(&context->asyncFds);
470         FD_SET(context->context->async_fd,&context->asyncFds);
471         context->tmo.tv_sec=0;
472         context->tmo.tv_usec=0;
473         
474         MACHSTATE(3,"asyncFds zeroed and set");*/
475
476         //protection domain
477         context->pd = ibv_alloc_pd(context->context);
478         CmiAssert(context->pd != NULL);
479         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
480
481   /******** At this point we know that this node is more or less serviceable
482         So, this is a good point for sending the partial init message for the fast
483         start case
484         Moreover, no work dependent on the number of nodes has started yet.
485         ************/
486
487 #if CMK_IBVERBS_FAST_START
488   send_partial_init();
489 #endif
490
491
492         context->header.nodeNo = _Cmi_mynode;
493
494         mtu_size=1200;
495         packetSize = mtu_size*4;
496         dataSize = packetSize-sizeof(struct infiPacketHeader);
497         
498         calcMaxSize=8000;
499 /*      if(_Cmi_numnodes*50 > calcMaxSize){
500                 calcMaxSize = _Cmi_numnodes*50;
501                 if(calcMaxSize > 10000){
502                         calcMaxSize = 10000;
503                 }
504         }*/
505 //      maxRecvBuffers=80;
506         maxRecvBuffers=calcMaxSize;
507         maxTokens = maxRecvBuffers;
508 //      maxTokens = 80;
509         context->tokensLeft=maxTokens;
510         //tokensPerProcessor=4;
511         if(_Cmi_numnodes > 1){
512                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
513         }
514         
515         
516         //TURN ON RDMA
517         rdma=1;
518 //      rdmaThreshold=32768;
519         rdmaThreshold=22000;
520         firstBinSize = 120;
521         CmiAssert(rdmaThreshold > firstBinSize);
522         blockAllocRatio=16;
523         blockThreshold=8;
524
525 #if !THREAD_MULTI_POOL
526         initInfiCmiChunkPools();
527 #endif
528
529         /*create the pool of send packets*/
530         sendPacketPoolSize = maxTokens/2;       
531         if(sendPacketPoolSize > 2000){
532                 sendPacketPoolSize = 2000;
533         }
534         
535         context->infiPacketFreeList=NULL;
536         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
537
538         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
539 #if !THREAD_MULTI_POOL
540         for(i=0;i<sendPacketPoolSize;i++){
541                 MallocInfiPacket(pktPtrs[i]);   
542         }
543
544         for(i=0;i<sendPacketPoolSize;i++){
545                 FreeInfiPacket(pktPtrs[i]);     
546         }
547         free(pktPtrs);
548 #endif
549         
550         context->bufferedBcastList=NULL;
551         context->bufferedRdmaAcks = NULL;
552         context->bufferedRdmaRequests = NULL;
553         context->insideProcessBufferedBcasts=0;
554         
555         
556         if(rdma){
557 /*              int numPkts;
558                 int k;
559                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
560                         numPkts = _Cmi_numnodes*4;
561                 }else{
562                         numPkts = maxRecvBuffers/4;
563                 }
564                 
565                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
566                 for(k=0;k<numPkts;k++){
567                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
568                 }
569                 
570                 for(k=0;k<numPkts;k++){
571                         CmiFree(rdmaPktPtrs[k]);
572                 }
573                 free(rdmaPktPtrs);*/
574         }
575         
576 /*      context->infiBufferedRecvList = NULL;*/
577 #if CMK_IBVERBS_STATS   
578         regCount =0;
579         regTime  = 0;
580
581         pktCount=0;
582         msgCount=0;
583
584         processBufferedCount=0;
585         processBufferedTime=0;
586
587         //TODO: erase this
588         processLockTime = 0;
589
590         minTokensLeft = maxTokens;
591 #endif  
592
593         
594
595         MACHSTATE(3,"} CmiMachineInit");
596 }
597
598 void CmiCommunicationInit(char **argv)
599 {
600 #if THREAD_MULTI_POOL
601         initInfiCmiChunkPools();
602         fillBufferPools();
603 #endif
604 }
605
606 /*********
607         Open a qp for every processor
608 *****/
609 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
610         int myLid;
611         int i;
612         
613         
614         //find my lid
615         myLid = getLocalLid(context->context,ibPort);
616         
617         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
618
619         context->sendCqSize = maxTokens+2;
620         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
621         CmiAssert(context->sendCq != NULL);
622         
623         MACHSTATE1(3,"sendCq created %p",context->sendCq);
624         
625         
626         context->recvCqSize = maxRecvBuffers;
627         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
628         
629         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
630         CmiAssert(context->recvCq != NULL);
631         
632         //array of queue pairs
633
634         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
635
636         if(numNodes > 1)
637         {
638                 context->srqSize = (maxRecvBuffers+2);
639                 struct ibv_srq_init_attr srqAttr = {
640                         .attr = {
641                         .max_wr  = context->srqSize,
642                         .max_sge = 1
643                         }
644                 };
645                 context->srq = ibv_create_srq(context->pd,&srqAttr);
646                 CmiAssert(context->srq != NULL);
647         
648                 struct ibv_qp_init_attr initAttr = {
649                         .qp_type = IBV_QPT_RC,
650                         .send_cq = context->sendCq,
651                         .recv_cq = context->recvCq,
652                         .srq             = context->srq,
653                         .sq_sig_all = 0,
654                         .qp_context = NULL,
655                         .cap     = {
656                                 .max_send_wr  = maxTokens,
657                                 .max_send_sge = 2,
658                         },
659                 };
660                 struct ibv_qp_attr attr;
661
662                 attr.qp_state        = IBV_QPS_INIT;
663                 attr.pkey_index      = 0;
664                 attr.port_num        = ibPort;
665                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
666
667 /*              MACHSTATE1(3,"context->pd %p",context->pd);
668                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
669                 MACHSTATE1(3,"TEST QP %p",qp);*/
670
671                 for( i=0;i<numNodes;i++){
672                         if(i == myNode){
673                         }else{
674                                 localAddr[i].lid = myLid;
675                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
676                         
677                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
678                         
679                                 CmiAssert(context->qp[i] != NULL);
680                         
681                         
682                                 ibv_modify_qp(context->qp[i], &attr,
683                                           IBV_QP_STATE              |
684                                           IBV_QP_PKEY_INDEX         |
685                                         IBV_QP_PORT               |
686                                         IBV_QP_ACCESS_FLAGS);           
687
688                                 localAddr[i].qpn = context->qp[i]->qp_num;
689                                 localAddr[i].psn = lrand48() & 0xffffff;
690                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
691                         }
692                 }
693         }
694         MACHSTATE(3,"qps created");
695 };
696
697 void copyInfiAddr(ChInfiAddr *qpList){
698         int qpListIdx=0;
699         int i;
700         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
701         for(i=0;i<_Cmi_numnodes;i++){
702                 if(i == _Cmi_mynode){
703                 }else{
704                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
705                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
706                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
707                         qpListIdx++;
708                 }
709         }
710 }
711
712
713 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
714         struct ibv_port_attr attr;
715
716         if (ibv_query_port(dev_context, port, &attr))
717                 return 0;
718
719         return attr.lid;
720 }
721
722 /**************** END OF CmiMachineInit and its helper functions*/
723
724 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
725 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
726
727 /* Initial the infiniband specific data for a remote node
728         1. connect the qp and store it in and return it
729 **/
730 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
731         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
732         int err;
733         ret->state = INFI_HEADER_DATA;
734         ret->qp = context->qp[node];
735 //      ret->totalTokens = tokensPerProcessor;
736 //      ret->tokensLeft = tokensPerProcessor;
737         ret->nodeNo = node;
738 //      ret->postedRecvs = tokensPerProcessor;
739 #if     CMK_IBVERBS_DEBUG       
740         ret->psn = 0;
741         ret->recvPsn = 0;
742 #endif
743         
744         struct ibv_qp_attr attr = {
745                 .qp_state               = IBV_QPS_RTR,
746                 .path_mtu               = mtu,
747                 .dest_qp_num            = addr[1],
748                 .rq_psn                 = addr[2],
749                 .max_dest_rd_atomic     = 1,
750                 .min_rnr_timer          = 31,
751                 .ah_attr                = {
752                         .is_global      = 0,
753                         .dlid           = addr[0],
754                         .sl             = 0,
755                         .src_path_bits  = 0,
756                         .port_num       = context->ibPort
757                 }
758         };
759         
760         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
761         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
762         
763         if (err = ibv_modify_qp(ret->qp, &attr,
764           IBV_QP_STATE              |
765           IBV_QP_AV                 |
766           IBV_QP_PATH_MTU           |
767           IBV_QP_DEST_QPN           |
768           IBV_QP_RQ_PSN             |
769           IBV_QP_MAX_DEST_RD_ATOMIC |
770           IBV_QP_MIN_RNR_TIMER)) {
771                         MACHSTATE1(3,"ERROR %d",err);
772                         CmiAbort("failed to change qp state to RTR");
773         }
774
775         MACHSTATE(3,"qp state changed to RTR");
776         
777         attr.qp_state       = IBV_QPS_RTS;
778         attr.timeout        = 26;
779         attr.retry_cnt      = 20;
780         attr.rnr_retry      = 7;
781         attr.sq_psn         = context->localAddr[node].psn;
782         attr.max_rd_atomic  = 1;
783
784         
785         if (ibv_modify_qp(ret->qp, &attr,
786           IBV_QP_STATE              |
787           IBV_QP_TIMEOUT            |
788           IBV_QP_RETRY_CNT          |
789           IBV_QP_RNR_RETRY          |
790           IBV_QP_SQ_PSN             |
791           IBV_QP_MAX_QP_RD_ATOMIC)) {
792                         fprintf(stderr, "Failed to modify QP to RTS\n");
793                         exit(1);
794         }
795         MACHSTATE(3,"qp state changed to RTS");
796
797         MACHSTATE(3,"} initInfiOtherNodeData");
798         return ret;
799 }
800
801
802 void    infiPostInitialRecvs(){
803         //create the pool and post the receives
804         int numPosts;
805 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
806                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
807         }else{
808                 numPosts = maxRecvBuffers;
809         }*/
810
811         if(_Cmi_numnodes > 1){
812                 numPosts = maxRecvBuffers;
813         }else{
814                 numPosts = 0;
815         }
816         if(numPosts > 0){
817                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
818                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
819         }
820
821
822         free(context->qp);
823         context->qp = NULL;
824         free(context->localAddr);
825         context->localAddr= NULL;
826 }
827
828 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
829         int numBuffers;
830         int i;
831         int bigSize;
832         char *bigBuf;
833         struct infiBufferPool *ret;
834         struct ibv_mr *bigKey;
835
836         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
837
838         page_size = sysconf(_SC_PAGESIZE);
839         ret = malloc(sizeof(struct infiBufferPool));
840         ret->next = NULL;
841         numBuffers=ret->numBuffers = numRecvs;
842         
843         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
844         
845         bigSize = numBuffers*sizePerBuffer;
846         bigBuf=malloc(bigSize);
847         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
848         CmiAssert(bigKey != NULL);
849         
850         for(i=0;i<numBuffers;i++){
851                 struct infiBuffer *buffer =  &(ret->buffers[i]);
852                 buffer->type = BUFFER_RECV;
853                 buffer->size = sizePerBuffer;
854                 
855                 /*buffer->buf = malloc(sizePerBuffer);
856                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
857                 buffer->buf = &bigBuf[i*sizePerBuffer];
858                 buffer->key = bigKey;
859
860                 if(buffer->key == NULL){
861                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
862                         CmiAssert(buffer->key != NULL);
863                 }
864         }
865         return ret;
866 };
867
868
869
870 /**
871          Post the buffers as recv work requests
872 */
873 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
874         int j,err;
875         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
876         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
877         struct ibv_recv_wr *bad_wr;
878         
879         int startBufferIdx=0;
880         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
881         for(j=0;j<numRecvs;j++){
882                 
883                 
884                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
885                 sgElements[j].length = sizePerBuffer;
886                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
887                 
888                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
889                 workRequests[j].sg_list = &sgElements[j];
890                 workRequests[j].num_sge = 1;
891                 if(j != numRecvs-1){
892                         workRequests[j].next = &workRequests[j+1];
893                 }
894                 
895         }
896         workRequests[numRecvs-1].next = NULL;
897         MACHSTATE(3,"About to call ibv_post_srq_recv");
898         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
899                 CmiAssert(0);
900         }
901
902         free(workRequests);
903         free(sgElements);
904 }
905
906
907
908
909 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
910
911 static void CmiMachineExit()
912 {
913 #if CMK_IBVERBS_STATS   
914         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
915 #endif
916
917 #if THREAD_MULTI_POOL
918         printf("IBVERBS Multi pool version\n");
919 #endif
920
921         //TODO: erase this
922         printf("Alloc time = %.8f  Lock time = %.8f count = %d\n",processBufferedTime/processBufferedCount,processLockTime/processBufferedCount,processBufferedCount);
923
924         //TODO: ERASE this
925         MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
926 }
927 static void ServiceCharmrun_nolock();
928
929 static void CmiNotifyStillIdle(CmiIdleState *s) {
930 #if CMK_SMP
931         CmiCommLock();
932 /*      if(where == COMM_SERVER_FROM_SMP)*/
933 #endif
934 /*              ServiceCharmrun_nolock();*/
935
936         CommunicationServer_nolock(0);
937 #if CMK_SMP
938         CmiCommUnlock();
939 #endif
940 }
941
942 static inline void increaseTokens(OtherNode node);
943
944 static inline int pollRecvCq(const int toBuffer);
945 static inline int pollSendCq(const int toBuffer);
946
947
948 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
949 #if !CMK_IBVERBS_TOKENS_FLOW
950         return;
951 #else
952         //if(infiData->tokensLeft == 0){
953         if(context->tokensLeft == 0){
954                 MACHSTATE(3,"GET FREE TOKENS {{{");
955         }else{
956                 return;
957         }
958         while(context->tokensLeft == 0){
959                 CommunicationServer_nolock(1); 
960         }
961         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
962 #endif
963 }
964
965
966 /**
967         Packetize this data and send it
968 **/
969
970
971
972 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
973         int incTokens=0;
974         int retval;
975 #if     CMK_IBVERBS_DEBUG
976         packet->header.psn = (++node->infiData->psn);
977 #endif  
978
979
980
981         packet->elemList[1].addr = (uintptr_t)packet->buf;
982         packet->elemList[1].length = size;
983         packet->elemList[1].lkey = dataKey->lkey;
984         
985         
986         packet->destNode = node;
987         
988 #if CMK_IBVERBS_STATS   
989         pktCount++;
990 #endif  
991         
992         getFreeTokens(node->infiData);
993
994 #if CMK_IBVERBS_INCTOKENS       
995         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
996                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
997                 incTokens=1;
998         }
999 #endif
1000 /*
1001         if(!checkQp(node->infiData->qp)){
1002                 pollSendCq(1);
1003                 CmiAssert(0);
1004         }*/
1005
1006         struct ibv_send_wr *bad_wr=NULL;
1007         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1008                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
1009                 CmiAssert(0);
1010         }
1011 #if     CMK_IBVERBS_TOKENS_FLOW
1012         context->tokensLeft--;
1013 #if     CMK_IBVERBS_STATS
1014         if(context->tokensLeft < minTokensLeft){
1015                 minTokensLeft = context->tokensLeft;
1016         }
1017 #endif
1018 #endif
1019
1020 /*      if(!checkQp(node->infiData->qp)){
1021                 pollSendCq(1);
1022                 CmiAssert(0);
1023         }*/
1024
1025 #if CMK_IBVERBS_INCTOKENS       
1026         if(incTokens){
1027                 increaseTokens(node);
1028         }
1029 #endif
1030
1031
1032 #if     CMK_IBVERBS_DEBUG
1033         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1034 #else
1035         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1036 #endif
1037
1038 };
1039
1040
1041 static void inline EnqueueDummyPacket(OtherNode node,int size){
1042         infiPacket packet;
1043         MallocInfiPacket(packet);
1044         packet->size = size;
1045         packet->buf = CmiAlloc(size);
1046         
1047         packet->header.code = INFIDUMMYPACKET;
1048
1049         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1050         
1051         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1052         EnqueuePacket(node,packet,size,key);
1053 }
1054
1055
1056
1057
1058
1059
1060 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1061         infiPacket packet;
1062         MallocInfiPacket(packet);
1063         packet->size = size;
1064         packet->buf=data;
1065         
1066         //the nodeNo is added at time of packet allocation
1067         packet->header.code = INFIPACKETCODE_DATA;
1068         
1069         ogm->refcount++;
1070         packet->ogm = ogm;
1071         
1072         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1073         CmiAssert(key != NULL);
1074         
1075         EnqueuePacket(node,packet,size,key);
1076 };
1077
1078 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1079 static inline void processAllBufferedMsgs();
1080
1081 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1082         int size; char *data;
1083 //      processAllBufferedMsgs();
1084
1085         
1086         ogm->refcount++;
1087   size = ogm->size;
1088   data = ogm->data;
1089
1090 #if CMK_IBVERBS_STATS   
1091         msgCount++;
1092 #endif
1093
1094         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1095         //First packet has dgram header, other packets dont
1096         
1097   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1098         
1099         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1100
1101         if(rdma && size > rdmaThreshold){
1102                         EnqueueRdmaPacket(ogm,node);
1103         }else{
1104         
1105                 while(size > dataSize){
1106                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1107                         size -= dataSize;
1108                         data += dataSize;
1109                 }
1110                 if(size > 0){
1111                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1112                 }
1113         }
1114 //#if   !CMK_SMP
1115         processAllBufferedMsgs();
1116 //#endif
1117         ogm->refcount--;
1118         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1119 }
1120
1121
1122 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1123         infiPacket packet;
1124
1125         ogm->refcount++;
1126         
1127         MallocInfiPacket(packet);
1128  
1129  {
1130                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1131
1132                 
1133                 packet->size = sizeof(struct infiRdmaPacket);
1134                 packet->buf = (char *)rdmaPacket;
1135                 
1136 /*              struct ibv_mr *key = ibv_reg_mr(context->pd,ogm->data,ogm->size,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE);*/
1137                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1138                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1139                 
1140                 packet->header.code = INFIRDMA_START;
1141                 packet->header.nodeNo = _Cmi_mynode;
1142                 packet->ogm = NULL;
1143
1144                 rdmaPacket->type = INFI_MESG;
1145                 rdmaPacket->ogm = ogm;
1146                 rdmaPacket->key = *key;
1147                 rdmaPacket->keyPtr = key;
1148                 rdmaPacket->remoteBuf = ogm->data;
1149                 rdmaPacket->remoteSize = ogm->size;
1150                 
1151                 
1152                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1153                 
1154                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1155                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1156         }
1157 }
1158
1159
1160
1161 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1162 static inline void processSendWC(struct ibv_wc *sendWC);
1163 static unsigned int _count=0;
1164 extern int errno;
1165 static int _countAsync=0;
1166 static inline void processAsyncEvents(){
1167         struct ibv_async_event event;
1168         int ready;
1169         _countAsync++;
1170         if(_countAsync < 1){
1171                 return;
1172         }
1173         _countAsync=0;
1174         FD_SET(context->context->async_fd,&context->asyncFds);
1175         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1176         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1177         
1178         if(ready==0){
1179                 return;
1180         }
1181         if(ready == -1){
1182 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1183                 return;
1184         }
1185         
1186         if (ibv_get_async_event(context->context, &event)){
1187                 return;
1188                 CmiAbort("get async event failed");
1189         }
1190         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1191         ibv_ack_async_event(&event);
1192
1193         
1194 }
1195
1196 static void pollCmiDirectQ();
1197
1198 static inline  void CommunicationServer_nolock(int toBuffer) {
1199         int processed;
1200         if(_Cmi_numnodes <= 1){
1201                 pollCmiDirectQ();
1202                 return;
1203         }
1204         MACHSTATE(2,"CommServer_nolock{");
1205         
1206 //      processAsyncEvents();
1207         
1208 //      checkAllQps();
1209
1210         pollCmiDirectQ();
1211
1212         processed = pollRecvCq(toBuffer);
1213         
1214
1215         processed += pollSendCq(toBuffer);
1216         
1217         if(toBuffer == 0){
1218 //              if(processed != 0)
1219                         processAllBufferedMsgs();
1220         }
1221         
1222 //      checkAllQps();
1223 //      _count--;
1224         MACHSTATE(2,"} CommServer_nolock ne");
1225         
1226 }
1227 /*
1228 static inline infiBufferedWC createInfiBufferedWC(){
1229         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1230         ret->count = 0;
1231         ret->next = ret->prev =NULL;
1232         return ret;
1233 }*/
1234
1235 /****
1236         The buffered recvWC are stored in a doubly linked list of 
1237         arrays or blocks of wcs.
1238         To keep the average insert cost low, a new block is added 
1239         to the top of the list. (resulting in a reverse seq of blocks)
1240         Within a block however wc are stored in a sequence
1241 *****/
1242 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1243         infiBufferedWC block;
1244         MACHSTATE(3,"Insert Buffered Recv called");
1245         if( context->infiBufferedRecvList ==NULL){
1246                 context->infiBufferedRecvList = createInfiBufferedWC();
1247                 block = context->infiBufferedRecvList;
1248         }else{
1249                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1250                         block = createInfiBufferedWC();
1251                         context->infiBufferedRecvList->prev = block;
1252                         block->next = context->infiBufferedRecvList;
1253                         context->infiBufferedRecvList = block;
1254                 }else{
1255                         block = context->infiBufferedRecvList;
1256                 }
1257         }
1258         
1259         block->wcList[block->count] = *wc;
1260         block->count++;
1261 };
1262 */
1263
1264
1265 /********
1266 go through the blocks of bufferedWC. Process the last block first.
1267 Then the next one and so on. (Processing within a block should happen
1268 in sequence).
1269 Leave the last block in place to avoid having to allocate again
1270 ******/
1271 /*static inline void processBufferedRecvList(){
1272         infiBufferedWC start;
1273         start = context->infiBufferedRecvList;
1274         while(start->next != NULL){
1275                 start = start->next;
1276         }
1277         while(start != NULL){
1278                 int i=0;
1279                 infiBufferedWC tmp;
1280                 for(i=0;i<start->count;i++){
1281                         processRecvWC(&start->wcList[i]);
1282                 }
1283                 if(start != context->infiBufferedRecvList){
1284                         //not the first one
1285                         tmp = start;
1286                         start = start->prev;
1287                         free(tmp);
1288                         start->next = NULL;
1289                 }else{
1290                         start = start->prev;
1291                 }
1292         }
1293         context->infiBufferedRecvList->next = NULL;
1294         context->infiBufferedRecvList->prev = NULL;
1295         context->infiBufferedRecvList->count = 0;
1296 }
1297 */
1298
1299 static inline int pollRecvCq(const int toBuffer){
1300         int i;
1301         int ne;
1302         struct ibv_wc wc[WC_LIST_SIZE];
1303         
1304         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1305         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1306 //      CmiAssert(ne >=0);
1307         
1308         if(ne != 0){
1309                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1310         }
1311         
1312         for(i=0;i<ne;i++){
1313                 if(wc[i].status != IBV_WC_SUCCESS){
1314                         CmiAssert(0);
1315                 }
1316                 switch(wc[i].opcode){
1317                         case IBV_WC_RECV:
1318                                         processRecvWC(&wc[i],toBuffer);
1319                                 break;
1320                         default:
1321                                 CmiAbort("Wrong type of work completion object in recvq");
1322                                 break;
1323                 }
1324                         
1325         }
1326         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1327         return ne;
1328
1329 }
1330
1331 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1332
1333 static inline int pollSendCq(const int toBuffer){
1334         int i;
1335         int ne;
1336         struct ibv_wc wc[WC_LIST_SIZE];
1337
1338         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1339 //      CmiAssert(ne >=0);
1340         
1341         
1342         for(i=0;i<ne;i++){
1343                 if(wc[i].status != IBV_WC_SUCCESS){
1344                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1345 #if CMK_IBVERBS_STATS
1346         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1347 #endif
1348                         CmiAssert(0);
1349                 }
1350                 switch(wc[i].opcode){
1351                         case IBV_WC_SEND:{
1352                                 //message received
1353                                 processSendWC(&wc[i]);
1354                                 
1355                                 break;
1356                                 }
1357                         case IBV_WC_RDMA_READ:
1358                         {
1359 //                              processRdmaWC(&wc[i],toBuffer);
1360                                         processRdmaWC(&wc[i],1);
1361                                 break;
1362                         }
1363                         case IBV_WC_RDMA_WRITE:
1364                         {
1365                                 /*** used for CmiDirect puts 
1366                                 Nothing needs to be done on the sender side once send is done **/
1367                                 break;
1368                         }
1369                         default:
1370                                 CmiAbort("Wrong type of work completion object in recvq");
1371                                 break;
1372                 }
1373                         
1374         }
1375         return ne;
1376 }
1377
1378
1379 /******************
1380 Check the communication server socket and
1381
1382 *****************/
1383 int CheckSocketsReady(int withDelayMs)
1384 {   
1385   int nreadable;
1386   CMK_PIPE_DECL(withDelayMs);
1387
1388   CmiStdoutAdd(CMK_PIPE_SUB);
1389   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1390
1391   nreadable=CMK_PIPE_CALL();
1392   ctrlskt_ready_read = 0;
1393   dataskt_ready_read = 0;
1394   dataskt_ready_write = 0;
1395   
1396   if (nreadable == 0) {
1397     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1398     return nreadable;
1399   }
1400   if (nreadable==-1) {
1401     CMK_PIPE_CHECKERR();
1402     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1403     return CheckSocketsReady(0);
1404   }
1405   CmiStdoutCheck(CMK_PIPE_SUB);
1406   if (Cmi_charmrun_fd!=-1) 
1407           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1408   MACHSTATE(1,"} CheckSocketsReady")
1409   return nreadable;
1410 }
1411
1412
1413 /*** Service the charmrun socket
1414 *************/
1415
1416 static void ServiceCharmrun_nolock()
1417 {
1418   int again = 1;
1419   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1420   while (again)
1421   {
1422   again = 0;
1423   CheckSocketsReady(0);
1424   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1425   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1426   }
1427   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1428 }
1429
1430
1431
1432 static void CommunicationServer(int sleepTime, int where){
1433         if( where == COMM_SERVER_FROM_INTERRUPT){
1434                 return;
1435         }
1436 #if CMK_SMP
1437         if(where == COMM_SERVER_FROM_WORKER){
1438                 return;
1439         }
1440         if(where == COMM_SERVER_FROM_SMP){
1441 #endif
1442                 ServiceCharmrun_nolock();
1443 #if CMK_SMP
1444         }
1445         CmiCommLock();
1446 #endif
1447         CommunicationServer_nolock(0);
1448 #if CMK_SMP
1449         CmiCommUnlock();
1450 #endif
1451 }
1452
1453
1454 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1455
1456
1457 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1458
1459 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1460         char *newmsg;
1461         
1462         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1463         
1464         OtherNode node = &nodes[nodeNo];
1465         newmsg = node->asm_msg;         
1466         
1467         /// This simple state machine determines if this packet marks the beginning of a new message
1468         // from another node, or if this is another in a sequence of packets
1469         switch(node->infiData->state){
1470                 case INFI_HEADER_DATA:
1471                 {
1472                         int size;
1473                         int rank, srcpe, seqno, magic, i;
1474                         unsigned int broot;
1475                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1476                         size = CmiMsgHeaderGetLength(msg);
1477                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1478 //                      CmiAssert(size > 0);
1479 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1480                         
1481 //                      CmiAssert(newmsg == NULL);
1482                         if(len > size){
1483                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1484                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1485                         }
1486                         newmsg = (char *)CmiAlloc(size);
1487       _MEMCHECK(newmsg);
1488       memcpy(newmsg, msg, len);
1489       node->asm_rank = rank;
1490       node->asm_total = size;
1491       node->asm_fill = len;
1492       node->asm_msg = newmsg;
1493                         node->infiData->broot = broot;
1494                         if(len == size){
1495                                 //this is the only packet for this message 
1496                                 node->infiData->state = INFI_HEADER_DATA;
1497                         }else{
1498                                 //there are more packets following
1499                                 node->infiData->state = INFI_DATA;
1500                         }
1501                         break;
1502                 }
1503                 case INFI_DATA:
1504                 {
1505                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1506                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1507                                 CmiAbort("packet in the middle does not have expected length");
1508                         }
1509                         if(node->asm_fill+len > node->asm_total){
1510                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1511                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1512                         }
1513                         //tODO: remove this
1514                         memcpy(newmsg + node->asm_fill,msg,len);
1515                         node->asm_fill += len;
1516                         if(node->asm_fill == node->asm_total){
1517                                 node->infiData->state = INFI_HEADER_DATA;
1518                         }else{
1519                                 node->infiData->state = INFI_DATA;
1520                         }
1521                         break;
1522                 }
1523         }
1524         /// if this packet was the last packet in a message ie state was 
1525         /// reset to infi_header_data
1526         
1527         if(node->infiData->state == INFI_HEADER_DATA){
1528                 int total_size = node->asm_total;
1529                 node->asm_msg = NULL;
1530                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1531                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1532         }
1533         
1534 };
1535
1536 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1537 #if CMK_BROADCAST_SPANNING_TREE
1538         if (rank == DGRAM_BROADCAST
1539 #if CMK_NODE_QUEUE_AVAILABLE
1540           || rank == DGRAM_NODEBROADCAST
1541 #endif
1542          ){
1543                                          if(toBuffer){
1544                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1545                                                 }else{
1546                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1547                                                 }
1548                                         }
1549 #elif CMK_BROADCAST_HYPERCUBE
1550         if (rank == DGRAM_BROADCAST
1551 #if CMK_NODE_QUEUE_AVAILABLE
1552           || rank == DGRAM_NODEBROADCAST
1553 #endif
1554          ){
1555                                          if(toBuffer){
1556                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1557                                          }else{
1558                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1559                                                 }
1560                                         }
1561 #endif
1562
1563
1564                 
1565                 switch (rank) {
1566         case DGRAM_BROADCAST: {
1567                                 int i;
1568                                 for (i=1; i<_Cmi_mynodesize; i++){
1569                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1570                                 }
1571           CmiPushPE(0, newmsg);
1572           break;
1573       }
1574 #if CMK_NODE_QUEUE_AVAILABLE
1575         case DGRAM_NODEBROADCAST: 
1576         case DGRAM_NODEMESSAGE: {
1577           CmiPushNode(newmsg);
1578           break;
1579         }
1580 #endif
1581         default:
1582                                 {
1583                                         
1584           CmiPushPE(rank, newmsg);
1585                                 }
1586         }    /* end of switch */
1587                 if(!toBuffer){
1588 //#if !CMK_SMP          
1589                 processAllBufferedMsgs();
1590 //#endif
1591                 }
1592 }
1593
1594
1595 static inline void increasePostedRecvs(int nodeNo);
1596 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1597 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1598
1599 //struct infiDirectRequestPacket;
1600 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1601
1602 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1603         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1604         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1605         int nodeNo = header->nodeNo;
1606 #if     CMK_IBVERBS_DEBUG
1607         OtherNode node = &nodes[nodeNo];
1608 #endif
1609         
1610         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1611 #if     CMK_IBVERBS_DEBUG
1612         if(node->infiData->recvPsn == 0){
1613                 node->infiData->recvPsn = header->psn;
1614         }else{
1615                 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1616                 node->infiData->recvPsn++;
1617         }
1618         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1619 #else
1620         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1621 #endif
1622         
1623         if(header->code & INFIPACKETCODE_DATA){
1624                         
1625                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1626         }
1627         if(header->code & INFIDUMMYPACKET){
1628                 MACHSTATE(3,"Dummy packet");
1629         }
1630         if(header->code & INFIBARRIERPACKET){
1631                 MACHSTATE(3,"Barrier packet");
1632                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1633         }
1634
1635 #if CMK_IBVERBS_INCTOKENS       
1636         if(header->code & INFIPACKETCODE_INCTOKENS){
1637                 increasePostedRecvs(nodeNo);
1638         }
1639 #endif  
1640         if(rdma && header->code & INFIRDMA_START){
1641                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1642 //              if(toBuffer){
1643                         //TODO: make a function of this and use for both acks and requests
1644                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1645                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1646                         *copyPacket = *rdmaPacket;
1647                         copyPacket->fromNodeNo = nodeNo;
1648                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1649                         context->bufferedRdmaRequests = copyPacket;
1650                         copyPacket->next = tmp;
1651                         copyPacket->prev = NULL;
1652                         if(tmp != NULL){
1653                                 tmp->prev = copyPacket;
1654                         }
1655 /*              }else{
1656                         processRdmaRequest(rdmaPacket,nodeNo,0);
1657                 }*/
1658         }
1659         if(rdma && header->code & INFIRDMA_ACK){
1660                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1661                 processRdmaAck(rdmaPacket);
1662         }
1663 /*      if(header->code & INFIDIRECT_REQUEST){
1664                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1665                 processDirectRequest(directRequestPacket);
1666         }*/
1667         {
1668                 struct ibv_sge list = {
1669                         .addr   = (uintptr_t) buffer->buf,
1670                         .length = buffer->size,
1671                         .lkey   = buffer->key->lkey
1672                 };
1673         
1674                 struct ibv_recv_wr wr = {
1675                         .wr_id = (uint64_t)buffer,
1676                         .sg_list = &list,
1677                         .num_sge = 1,
1678                         .next = NULL
1679                 };
1680                 struct ibv_recv_wr *bad_wr;
1681         
1682                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1683                         CmiAssert(0);
1684                 }
1685         }
1686
1687 };
1688
1689
1690
1691
1692 static inline  void processSendWC(struct ibv_wc *sendWC){
1693
1694         infiPacket packet = (infiPacket )sendWC->wr_id;
1695 #if CMK_IBVERBS_TOKENS_FLOW
1696 //      packet->destNode->infiData->tokensLeft++;
1697         context->tokensLeft++;
1698 #endif
1699
1700         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1701         if(packet->ogm != NULL){
1702                 packet->ogm->refcount--;
1703                 if(packet->ogm->refcount == 0){
1704                         GarbageCollectMsg(packet->ogm); 
1705                 }
1706         }else{
1707                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1708
1709                 }
1710         }
1711
1712         FreeInfiPacket(packet);
1713 };
1714
1715
1716
1717 /********************************************************************/
1718 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1719         int nodeNo = fromNodeNo;
1720         OtherNode node = &nodes[nodeNo];
1721         struct infiRdmaPacket *rdmaPacket;
1722
1723         getFreeTokens(node->infiData);
1724 #if CMK_IBVERBS_TOKENS_FLOW
1725 //      node->infiData->tokensLeft--;
1726         context->tokensLeft--;
1727 #if     CMK_IBVERBS_STATS
1728         if(context->tokensLeft < minTokensLeft){
1729                 minTokensLeft = context->tokensLeft;
1730         }
1731 #endif
1732 #endif
1733         
1734         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1735 //      CmiAssert(buffer != NULL);
1736         
1737         
1738         if(isBuffered){
1739                 rdmaPacket = _rdmaPacket;
1740         }else{
1741                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1742                 *rdmaPacket = *_rdmaPacket;
1743         }
1744
1745
1746         rdmaPacket->fromNodeNo = fromNodeNo;
1747         rdmaPacket->localBuffer = (void *)buffer;
1748         
1749         buffer->type = BUFFER_RDMA;
1750         buffer->size = rdmaPacket->remoteSize;
1751         
1752         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1753 //      CmiAssert(buffer->buf != NULL);
1754
1755         buffer->key = METADATAFIELD(buffer->buf)->key;
1756
1757         
1758         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1759         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1760 //      CmiAssert(buffer->key != NULL);
1761         
1762         {
1763                 struct ibv_sge list = {
1764                         .addr = (uintptr_t )buffer->buf,
1765                         .length = buffer->size,
1766                         .lkey   = buffer->key->lkey
1767                 };
1768
1769                 struct ibv_send_wr *bad_wr;
1770                 struct ibv_send_wr wr = {
1771                         .wr_id = (uint64_t )rdmaPacket,
1772                         .sg_list = &list,
1773                         .num_sge = 1,
1774                         .opcode = IBV_WR_RDMA_READ,
1775                         .send_flags = IBV_SEND_SIGNALED,
1776                         .wr.rdma = {
1777                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1778                                 .rkey = rdmaPacket->key.rkey
1779                         }
1780                 };
1781                 /** post and rdma_read that is a rdma get*/
1782                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1783                         CmiAssert(0);
1784                 }
1785         }
1786
1787 };
1788
1789 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1790 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1791
1792 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1793                 //rdma get done
1794 #if CMK_IBVERBS_STATS
1795         double _startRegTime;
1796 #endif  
1797
1798         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1799 /*      if(rdmaPacket->type == INFI_DIRECT){
1800                 processDirectWC(rdmaPacket);
1801                 return;
1802         }*/
1803 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1804         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1805
1806         /*TODO: remove this
1807         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1808         
1809 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1810         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1811         
1812         {
1813                 int size;
1814                 int rank, srcpe, seqno, magic, i;
1815                 unsigned int broot;
1816                 char *msg = buffer->buf;
1817                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1818                 size = CmiMsgHeaderGetLength(msg);
1819 /*              CmiAssert(size == buffer->size);*/
1820                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1821         }
1822         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1823
1824         
1825         free(buffer);
1826         
1827         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1828         //we are sending this ack as a response to a successful
1829         // rdma_Read.. the token for that rdma_Read needs to be freed
1830 #if CMK_IBVERBS_TOKENS_FLOW     
1831         //node->infiData->tokensLeft++;
1832         context->tokensLeft++;
1833 #endif
1834
1835         //send ack to sender if toBuffer is off otherwise buffer it
1836         if(toBuffer){
1837                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1838                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1839                 context->bufferedRdmaAcks = rdmaPacket;
1840                 rdmaPacket->next = tmp;
1841                 rdmaPacket->prev = NULL;
1842                 if(tmp != NULL){
1843                         tmp->prev = rdmaPacket; 
1844                 }
1845         }else{
1846                 EnqueueRdmaAck(rdmaPacket);             
1847                 free(rdmaPacket);
1848         }
1849 }
1850
1851 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1852         infiPacket packet;
1853         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1854
1855         
1856         MallocInfiPacket(packet);
1857         {
1858                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1859                 *ackPacket = *rdmaPacket;
1860                 packet->size = sizeof(struct infiRdmaPacket);
1861                 packet->buf = (char *)ackPacket;
1862                 packet->header.code = INFIRDMA_ACK;
1863                 packet->ogm=NULL;
1864                 
1865                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1866         
1867
1868                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1869         }
1870 };
1871
1872
1873 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1874         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1875         rdmaPacket->ogm->refcount--;
1876         GarbageCollectMsg(rdmaPacket->ogm);
1877 }
1878
1879
1880 /****************************
1881  Deal with all the buffered (delayed) messages
1882  such as processing recvd broadcasts, sending
1883  rdma acks and processing recvd rdma requests
1884 ******************************/
1885
1886
1887 static inline infiBufferedBcastPool createBcastPool(){
1888         int i;
1889         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1890         ret->count = 0;
1891         ret->next = ret->prev = NULL;   
1892         for(i=0;i<BCASTLIST_SIZE;i++){
1893                 ret->bcastList[i].valid = 0;
1894         }
1895         return ret;
1896 };
1897 /****
1898         The buffered bcast messages are stored in a doubly linked list of 
1899         arrays or blocks.
1900         To keep the average insert cost low, a new block is added 
1901         to the top of the list. (resulting in a reverse seq of blocks)
1902         Within a block however bcast are stored in increasing order sequence
1903 *****/
1904
1905 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1906         if(context->bufferedBcastList == NULL){
1907                 context->bufferedBcastList = createBcastPool();
1908         }else{
1909                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1910                         infiBufferedBcastPool tmp;
1911                         tmp = createBcastPool();
1912                         context->bufferedBcastList->prev = tmp;
1913                         tmp->next = context->bufferedBcastList;
1914                         context->bufferedBcastList = tmp;
1915                 }
1916         }
1917         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1918         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1919         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1920         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1921         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1922         
1923         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1924         
1925         context->bufferedBcastList->count++;
1926 }
1927
1928 /*********
1929         Go through the blocks of buffered bcast messages. process last block first
1930         processign within a block is in sequence though
1931 *********/
1932 static inline void processBufferedBcast(){
1933         infiBufferedBcastPool start;
1934
1935         if(context->bufferedBcastList == NULL){
1936                 return;
1937         }
1938         start = context->bufferedBcastList;
1939         if(context->insideProcessBufferedBcasts==1){
1940                 return;
1941         }
1942         context->insideProcessBufferedBcasts=1;
1943
1944         while(start->next != NULL){
1945                 start = start->next;
1946         }
1947         
1948         while(start != NULL){
1949                 int i=0;
1950                 infiBufferedBcastPool tmp;
1951                 if(start->count != 0){
1952                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1953                 }
1954                 for(i=0;i<start->count;i++){
1955                         if(start->bcastList[i].valid == 0){
1956                                 continue;
1957                         }
1958                         start->bcastList[i].valid=0;
1959                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1960 #if CMK_BROADCAST_SPANNING_TREE
1961         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1962 #if CMK_NODE_QUEUE_AVAILABLE
1963           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1964 #endif
1965          ){
1966                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
1967                                         }
1968 #elif CMK_BROADCAST_HYPERCUBE
1969         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1970 #if CMK_NODE_QUEUE_AVAILABLE
1971           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1972 #endif
1973          ){
1974                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
1975                                         }
1976 #endif
1977                 }
1978                 if(start->count != 0){
1979                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1980                 }
1981                 
1982                 tmp = start;
1983                 start = start->prev;
1984                 free(tmp);
1985                 if(start != NULL){
1986                         //not the first one
1987                         start->next = NULL;
1988                 }
1989         }
1990
1991         context->bufferedBcastList = NULL;
1992 /*      context->bufferedBcastList->prev = NULL;
1993         context->bufferedBcastList->count =0;   */
1994         context->insideProcessBufferedBcasts=0;
1995         MACHSTATE(2,"processBufferedBcast done ");
1996 };
1997
1998
1999 static inline void processBufferedRdmaAcks(){
2000         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2001         if(start == NULL){
2002                 return;
2003         }
2004         while(start->next != NULL){
2005                 start = start->next;
2006         }
2007         while(start != NULL){
2008                 struct infiRdmaPacket *rdmaPacket=start;
2009                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2010                 EnqueueRdmaAck(rdmaPacket);
2011                 start = start->prev;
2012                 free(rdmaPacket);
2013         }
2014         context->bufferedRdmaAcks=NULL;
2015 }
2016
2017
2018
2019 static inline void processBufferedRdmaRequests(){
2020         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2021         if(start == NULL){
2022                 return;
2023         }
2024         
2025         
2026         while(start->next != NULL){
2027                 start = start->next;
2028         }
2029         while(start != NULL){
2030                 struct infiRdmaPacket *rdmaPacket=start;
2031                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2032                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2033                 start = start->prev;
2034         }
2035         
2036         context->bufferedRdmaRequests=NULL;
2037 }
2038
2039
2040
2041
2042
2043 static inline void processAllBufferedMsgs(){
2044 #if CMK_IBVERBS_STATS
2045         double _startTime = CmiWallTimer();
2046         processBufferedCount++;
2047 #endif
2048         processBufferedBcast();
2049
2050         processBufferedRdmaAcks();
2051         processBufferedRdmaRequests();
2052 #if CMK_IBVERBS_STATS
2053         processBufferedTime += (CmiWallTimer()-_startTime);
2054 #endif  
2055 };
2056
2057
2058 /*************************
2059         Increase tokens when short of them
2060 **********/
2061 static inline void increaseTokens(OtherNode node){
2062         int err;
2063         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2064         if(node->infiData->totalTokens + increase > maxTokens){
2065                 increase = maxTokens-node->infiData->totalTokens;
2066         }
2067         node->infiData->totalTokens += increase;
2068         node->infiData->tokensLeft += increase;
2069         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2070         //increase the size of the sendCq
2071         int currentCqSize = context->sendCqSize;
2072         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2073                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2074                 CmiAssert(0);
2075         }
2076         context->sendCqSize+= increase;
2077 };
2078
2079 static void increasePostedRecvs(int nodeNo){
2080         OtherNode node = &nodes[nodeNo];
2081         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2082         int recvIncrease = tokenIncrease;
2083         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2084                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2085         }
2086         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2087                 recvIncrease = maxRecvBuffers-context->srqSize;
2088         }
2089         node->infiData->postedRecvs+= recvIncrease;
2090         context->srqSize += recvIncrease;
2091         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2092         //increase the size of the recvCq
2093         int currentCqSize = context->recvCqSize;
2094         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2095                 CmiAssert(0);
2096         }
2097         context->recvCqSize += tokenIncrease;
2098         if(recvIncrease > 0){
2099                 //create another bufferPool and attach it to the top of the current one
2100                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2101                 newPool->next = context->recvBufferPool;
2102                 context->recvBufferPool = newPool;
2103                 postInitialRecvs(newPool,recvIncrease,packetSize);
2104         }
2105
2106 };
2107
2108
2109
2110
2111 /*********************************************
2112         Memory management routines for RDMA
2113
2114 ************************************************/
2115
2116 /**
2117         There are INFINUMPOOLS of memory.
2118         The first pool is of size firstBinSize.
2119         The ith pool is of size firstBinSize*2^i
2120 */
2121
2122 static void initInfiCmiChunkPools(){
2123         int i,j;
2124         int size = firstBinSize;
2125         int nodeSize;
2126
2127 #if THREAD_MULTI_POOL
2128         nodeSize = CmiMyNodeSize() + 1;
2129         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2130         for(i = 0; i < nodeSize; i++){
2131                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2132         }
2133         for(j = 0; j < nodeSize; j++){
2134                 size = firstBinSize;
2135                 for(i=0;i<INFINUMPOOLS;i++){
2136                         infiCmiChunkPools[j][i].size = size;
2137                         infiCmiChunkPools[j][i].startBuf = NULL;
2138                         infiCmiChunkPools[j][i].count = 0;
2139                         size *= 2;
2140                 }
2141         }
2142
2143         // creating the n^2 system of queues
2144         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2145         for(i = 0; i < nodeSize; i++){
2146                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2147         }
2148         for(i = 0; i < nodeSize; i++)
2149                 for(j = 0; j < nodeSize; j++)
2150                         queuePool[i][j] = PCQueueCreate();
2151
2152 #else
2153
2154         size = firstBinSize;    
2155         for(i=0;i<INFINUMPOOLS;i++){
2156                 infiCmiChunkPools[i].size = size;
2157                 infiCmiChunkPools[i].startBuf = NULL;
2158                 infiCmiChunkPools[i].count = 0;
2159                 size *= 2;
2160         }
2161 #endif
2162
2163 }
2164
2165 /***********
2166 Register memory for a part of a received multisend message
2167 *************/
2168 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2169         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2170         char *res=msg-sizeof(infiCmiChunkHeader);
2171         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2172         metaData->owner = NULL;
2173         metaData->poolIdx = INFIMULTIPOOL;
2174
2175         return metaData;
2176 };
2177
2178
2179 #if THREAD_MULTI_POOL
2180
2181 // Fills up the buffer pools for every thread in the node
2182 static inline void fillBufferPools(){
2183         int nodeSize, poolIdx, thread;
2184         infiCmiChunkMetaData *metaData;         
2185         infiCmiChunkHeader *hdr;
2186         int allocSize;
2187         int count=1;
2188         int i;
2189         struct ibv_mr *key;
2190         void *res;
2191
2192         // initializing values
2193         nodeSize = CmiMyNodeSize() + 1;
2194
2195         // iterating over all threads and all pools
2196         for(thread = 0; thread < nodeSize; thread++){
2197                 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2198                         allocSize = infiCmiChunkPools[thread][poolIdx].size;
2199                         if(poolIdx < blockThreshold){
2200                                 count = blockAllocRatio;
2201                         }else{
2202                                 count = 1;
2203                         }
2204                         res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2205                         hdr = res;
2206                         key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2207                         CmiAssert(key != NULL);
2208                         res += sizeof(infiCmiChunkHeader);
2209                         for(i=0;i<count;i++){
2210                                 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2211                                 metaData->key = key;
2212                                 metaData->owner = hdr;
2213                                 metaData->poolIdx = poolIdx;
2214                                 metaData->parentPe = thread;                                            // setting the parent PE
2215                                 if(i == 0){
2216                                         metaData->owner->metaData->count = count;
2217                                         metaData->nextBuf = NULL;
2218                                 }else{
2219                                         void *startBuf = res - sizeof(infiCmiChunkHeader);
2220                                         metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2221                                         infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2222                                         infiCmiChunkPools[thread][poolIdx].count++;
2223                                 }
2224                                 if(i != count-1){
2225                                         res += (allocSize+sizeof(infiCmiChunkHeader));
2226                                 }
2227                         }
2228                 }
2229         }       
2230 }
2231
2232 static inline void *getInfiCmiChunkThread(int dataSize){
2233         //find out to which pool this dataSize belongs to
2234         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2235         int ratio = dataSize/firstBinSize;
2236         int poolIdx=0;
2237         void *res;
2238         int i,j,nodeSize;
2239         void *pointer;
2240
2241         //printf("Hi\n");
2242         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2243         MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2244         
2245         while(ratio > 0){
2246                 ratio  = ratio >> 1;
2247                 poolIdx++;
2248         }
2249         MACHSTATE1(2,"This is %d",CmiMyRank());
2250         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2251
2252         // checking whether to analyze the free queues to reuse buffers 
2253         nodeSize = CmiMyNodeSize() + 1;
2254         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2255                 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2256                 for(i = 0; i < nodeSize; i++){
2257                         if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2258                                 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2259                                         pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2260                                         infi_CmiFreeDirect(pointer);    
2261                                 }
2262                         }
2263                 }       
2264         }
2265
2266         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2267                 infiCmiChunkMetaData *metaData;         
2268                 infiCmiChunkHeader *hdr;
2269                 int allocSize;
2270                 int count=1;
2271                 int i;
2272                 struct ibv_mr *key;
2273                 void *origres;
2274                 
2275                 
2276                 if(poolIdx < INFINUMPOOLS ){
2277                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2278                 }else{
2279                         allocSize = dataSize;
2280                 }
2281
2282                 if(poolIdx < blockThreshold){
2283                         count = blockAllocRatio;
2284                 }
2285                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2286                 hdr = res;
2287                 
2288                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2289                 CmiAssert(key != NULL);
2290                 
2291                 origres = (res += sizeof(infiCmiChunkHeader));
2292
2293                 for(i=0;i<count;i++){
2294                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2295                         metaData->key = key;
2296                         metaData->owner = hdr;
2297                         metaData->poolIdx = poolIdx;
2298                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2299
2300                         if(i == 0){
2301                                 metaData->owner->metaData->count = count;
2302                                 metaData->nextBuf = NULL;
2303                         }else{
2304                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2305                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2306                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2307                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2308                                 
2309                         }
2310                         if(i != count-1){
2311                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2312                         }
2313           }     
2314                 
2315                 
2316                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2317                 
2318                 return origres;
2319         }
2320         if(poolIdx < INFINUMPOOLS){
2321                 infiCmiChunkMetaData *metaData;                         
2322         
2323                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2324                 res += sizeof(infiCmiChunkHeader);
2325
2326                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2327                 metaData = METADATAFIELD(res);
2328
2329                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2330                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2331                 
2332                 metaData->nextBuf = NULL;
2333 //              CmiAssert(metaData->poolIdx == poolIdx);
2334
2335                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2336                 return res;
2337         }
2338
2339         CmiAssert(0);
2340
2341         
2342 };
2343 #else
2344 static inline void *getInfiCmiChunk(int dataSize){
2345         //find out to which pool this dataSize belongs to
2346         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2347         int ratio = dataSize/firstBinSize;
2348         int poolIdx=0;
2349         void *res;
2350
2351         while(ratio > 0){
2352                 ratio  = ratio >> 1;
2353                 poolIdx++;
2354         }
2355         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2356         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2357                 infiCmiChunkMetaData *metaData;
2358                 infiCmiChunkHeader *hdr;
2359                 int allocSize;
2360                 int count=1;
2361                 int i;
2362                 struct ibv_mr *key;
2363                 void *origres;
2364
2365
2366                 if(poolIdx < INFINUMPOOLS ){
2367                         allocSize = infiCmiChunkPools[poolIdx].size;
2368                 }else{
2369                         allocSize = dataSize;
2370                 }
2371
2372                 if(poolIdx < blockThreshold){
2373                         count = blockAllocRatio;
2374                 }
2375                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2376                 hdr = res;
2377
2378                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2379                 CmiAssert(key != NULL);
2380
2381                 origres = (res += sizeof(infiCmiChunkHeader));
2382
2383                 for(i=0;i<count;i++){
2384                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2385                         metaData->key = key;
2386                         metaData->owner = hdr;
2387                         metaData->poolIdx = poolIdx;
2388
2389                         if(i == 0){
2390                                 metaData->owner->metaData->count = count;
2391                                 metaData->nextBuf = NULL;
2392                         }else{
2393                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2394                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2395                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2396                                 infiCmiChunkPools[poolIdx].count++;
2397
2398                         }
2399                         if(i != count-1){
2400                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2401                         }
2402           }
2403
2404
2405                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2406
2407                 return origres;
2408         }
2409         if(poolIdx < INFINUMPOOLS){
2410                 infiCmiChunkMetaData *metaData;
2411
2412                 res = infiCmiChunkPools[poolIdx].startBuf;
2413                 res += sizeof(infiCmiChunkHeader);
2414
2415                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2416                 metaData = METADATAFIELD(res);
2417
2418                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2419                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2420
2421                 metaData->nextBuf = NULL;
2422 //              CmiAssert(metaData->poolIdx == poolIdx);
2423
2424                 infiCmiChunkPools[poolIdx].count--;
2425                 return res;
2426         }
2427
2428         CmiAssert(0);
2429
2430
2431 };
2432 #endif
2433
2434
2435 void * infi_CmiAlloc(int size){
2436         void *res;
2437
2438         //TODO: erase this
2439         _startTime = CmiWallTimer();    
2440
2441 #if THREAD_MULTI_POOL
2442         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2443         res -= sizeof(CmiChunkHeader);
2444
2445         //TODO: erase this      
2446         processBufferedTime += CmiWallTimer() - _startTime;
2447         processBufferedCount++;
2448
2449         return res;
2450 #else
2451 #if CMK_SMP
2452         _auxTime = CmiWallTimer();
2453         CmiMemLock();
2454         processLockTime += CmiWallTimer() - _auxTime;
2455 #endif
2456 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2457                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2458
2459                 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));     
2460                 res -= sizeof(CmiChunkHeader);
2461 #if CMK_SMP     
2462         _auxTime = CmiWallTimer();
2463         CmiMemUnlock();
2464         processLockTime += CmiWallTimer() - _auxTime;
2465 #endif
2466 /*      }else{
2467                 res = malloc(size);
2468         }*/
2469         
2470         //TODO: erase this      
2471         processBufferedTime += CmiWallTimer() - _startTime;
2472         processBufferedCount++;
2473
2474         return res;
2475 #endif
2476 }
2477
2478 #if THREAD_MULTI_POOL
2479 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2480 void infi_CmiFreeDirect(void *ptr){
2481         int size;
2482         int parentPe;
2483         void *freePtr = ptr;
2484
2485         //ptr += sizeof(CmiChunkHeader);
2486         size = SIZEFIELD (ptr);
2487 /*      if(size > firstBinSize){*/
2488         infiCmiChunkMetaData *metaData;
2489         int poolIdx;
2490         //there is a infiniband specific header
2491         freePtr = ptr - sizeof(infiCmiChunkHeader);
2492         metaData = METADATAFIELD(ptr);
2493         poolIdx = metaData->poolIdx;
2494
2495         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2496 //      CmiAssert(poolIdx >= 0);
2497         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].count <= INFIMAXPERPOOL){
2498                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2499                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = freePtr;
2500                         infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2501
2502                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf,infiCmiChunkPools[CmiMyRank()][poolIdx].count);
2503                 }else{
2504                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2505                         metaData->owner->metaData->count--;
2506                         if(metaData->owner->metaData == metaData){
2507                                 //I am the owner
2508                                 if(metaData->owner->metaData->count == 0){
2509                                         //all the chunks have been freed
2510                                         ibv_dereg_mr(metaData->key);
2511                                         free(freePtr);
2512                                         free(metaData);
2513                                 }
2514                                 //if I am the owner and all the chunks have not been
2515                                 // freed dont free my metaData. will need later
2516                         }else{
2517                                 if(metaData->owner->metaData->count == 0){
2518                                         //need to free the owner's buffer and metadata
2519                                         freePtr = metaData->owner;
2520                                         ibv_dereg_mr(metaData->key);
2521                                         free(metaData->owner->metaData);
2522                                         free(freePtr);
2523                                 }
2524                                 free(metaData);
2525                         }
2526                 }
2527 }
2528
2529
2530 void infi_CmiFree(void *ptr){
2531         int i,j;
2532         int size;
2533         int parentPe;
2534         int nodeSize;
2535         void *pointer;
2536         void *freePtr = ptr;
2537         nodeSize = CmiMyNodeSize() + 1;
2538
2539         MACHSTATE(3,"Freeing");
2540
2541         ptr += sizeof(CmiChunkHeader);
2542         size = SIZEFIELD (ptr);
2543 /*      if(size > firstBinSize){*/
2544         infiCmiChunkMetaData *metaData;
2545         int poolIdx;
2546         //there is a infiniband specific header
2547         freePtr = ptr - sizeof(infiCmiChunkHeader);
2548         metaData = METADATAFIELD(ptr);
2549         poolIdx = metaData->poolIdx;
2550
2551         if(poolIdx == INFIMULTIPOOL){
2552                 /** this is a part of a received mult message  
2553                     it will be freed correctly later
2554                 **/
2555
2556                 return;
2557         }
2558
2559
2560         //TODO: erase this
2561         TESTfrees++;
2562
2563         // checking if this free operation is my responsibility
2564         parentPe = metaData->parentPe;
2565         if(parentPe != CmiMyRank()){
2566                 TESTneighbor++;
2567                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2568                 return;
2569         }
2570
2571
2572         infi_CmiFreeDirect(ptr);
2573
2574 // TODO: erase following code
2575 /*      // checking free request queues
2576         for(i = 0; i < nodeSize; i++){
2577                 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2578                         for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2579                                 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2580                                 infi_CmiFreeDirect(pointer);    
2581                         }
2582                 }
2583         }
2584 */
2585
2586 }
2587
2588 #else
2589 void infi_CmiFree(void *ptr){
2590         int size;
2591         void *freePtr = ptr;
2592         
2593 #if CMK_SMP     
2594         CmiMemLock();
2595 #endif
2596         ptr += sizeof(CmiChunkHeader);
2597         size = SIZEFIELD (ptr);
2598 /*      if(size > firstBinSize){*/
2599                 infiCmiChunkMetaData *metaData;
2600                 int poolIdx;
2601                 //there is a infiniband specific header
2602                 freePtr = ptr - sizeof(infiCmiChunkHeader);
2603                 metaData = METADATAFIELD(ptr);
2604                 poolIdx = metaData->poolIdx;
2605                 if(poolIdx == INFIMULTIPOOL){
2606                         /** this is a part of a received mult message  
2607                         it will be freed correctly later
2608                         **/
2609                         
2610                         return;
2611                 }
2612                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2613 //              CmiAssert(poolIdx >= 0);
2614                 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
2615                         metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2616                         infiCmiChunkPools[poolIdx].startBuf = freePtr;
2617                         infiCmiChunkPools[poolIdx].count++;
2618                         
2619                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2620                 }else{
2621                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2622                         metaData->owner->metaData->count--;
2623                         if(metaData->owner->metaData == metaData){
2624                                 //I am the owner
2625                                 if(metaData->owner->metaData->count == 0){
2626                                         //all the chunks have been freed
2627                                         ibv_dereg_mr(metaData->key);
2628                                         free(freePtr);
2629                                         free(metaData);
2630                                 }
2631                                 //if I am the owner and all the chunks have not been
2632                                 // freed dont free my metaData. will need later
2633                         }else{
2634                                 if(metaData->owner->metaData->count == 0){
2635                                         //need to free the owner's buffer and metadata
2636                                         freePtr = metaData->owner;
2637                                         ibv_dereg_mr(metaData->key);
2638                                         free(metaData->owner->metaData);
2639                                         free(freePtr);
2640                                 }
2641                                 free(metaData);
2642                         }
2643                 }       
2644 #if CMK_SMP     
2645         CmiMemUnlock();
2646 #endif
2647 /*      }else{
2648                 free(freePtr);
2649         }*/
2650 }
2651 #endif
2652
2653 /*********************************************************************************************
2654 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2655 the user can transfer data between processors without using Charm++ messages. This lets the user
2656 send and receive data from the middle of his arrays without any copying on either send or receive
2657 side
2658 *********************************************************************************************/
2659
2660 struct infiDirectRequestPacket{
2661         int senderProc;
2662         int handle;
2663         struct ibv_mr senderKey;
2664         void *senderBuf;
2665         int senderBufSize;
2666 };
2667
2668 #include "cmidirect.h"
2669
2670 #define MAXHANDLES 512
2671
2672 struct infiDirectHandleStruct;
2673
2674
2675 typedef struct directPollingQNodeStruct {
2676         struct infiDirectHandleStruct *handle;
2677         struct directPollingQNodeStruct *next;
2678         double *lastDouble;
2679 } directPollingQNode;
2680
2681 typedef struct infiDirectHandleStruct{
2682         int id;
2683         void *buf;
2684         int size;
2685         struct ibv_mr *key;
2686         void (*callbackFnPtr)(void *);
2687         void *callbackData;
2688 //      struct infiDirectRequestPacket *packet;
2689         struct infiDirectUserHandle userHandle;
2690         struct infiRdmaPacket *rdmaPacket;
2691         directPollingQNode pollingQNode;
2692 }       infiDirectHandle;
2693
2694 typedef struct infiDirectHandleTableStruct{
2695         infiDirectHandle handles[MAXHANDLES];
2696         struct infiDirectHandleTableStruct *next;
2697 } infiDirectHandleTable;
2698
2699
2700 // data structures 
2701
2702 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2703
2704 static infiDirectHandleTable **sendHandleTable=NULL;
2705 static infiDirectHandleTable **recvHandleTable=NULL;
2706
2707 static int *recvHandleCount=NULL;
2708
2709 void addHandleToPollingQ(infiDirectHandle *handle){
2710 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2711         directPollingQNode *newNode = &(handle->pollingQNode);
2712         newNode->handle = handle;
2713         newNode->next = NULL;
2714         if(headDirectPollingQ==NULL){
2715                 /*empty pollingQ*/
2716                 headDirectPollingQ = newNode;
2717                 tailDirectPollingQ = newNode;
2718         }else{
2719                 tailDirectPollingQ->next = newNode;
2720                 tailDirectPollingQ = newNode;
2721         }
2722 };
2723 /*
2724 infiDirectHandle *removeHandleFromPollingQ(){
2725         if(headDirectPollingQ == NULL){
2726                 //polling Q is empty
2727                 return NULL;
2728         }
2729         directPollingQNode *retNode = headDirectPollingQ;
2730         if(headDirectPollingQ == tailDirectPollingQ){
2731                 //PollingQ has one node 
2732                 headDirectPollingQ = tailDirectPollingQ = NULL;
2733         }else{
2734                 headDirectPollingQ = headDirectPollingQ->next;
2735         }
2736         infiDirectHandle *retHandle = retNode->handle;
2737         free(retNode);
2738         return retHandle;
2739 }*/
2740
2741 static inline infiDirectHandleTable **createHandleTable(){
2742         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2743         int i;
2744         for(i=0;i<_Cmi_numnodes;i++){
2745                 table[i] = NULL;
2746         }
2747         return table;
2748 }
2749
2750 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2751         *tableIdx = handle/MAXHANDLES;
2752         *idx = handle%MAXHANDLES;
2753 };
2754
2755 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2756 {
2757         /** initialize the last double in the buffer to bufize***/
2758         int index = recvBufSize - sizeof(double);
2759         double *lastDouble = (double *)(((char *)recvBuf)+index);
2760         *lastDouble = initialValue;
2761 }
2762
2763
2764 /**
2765  To be called on the receiver to create a handle and return its number
2766 **/
2767 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2768         int newHandle;
2769         int tableIdx,idx;
2770         int i;
2771         infiDirectHandleTable *table;
2772         struct infiDirectUserHandle userHandle;
2773         
2774         CmiAssert(recvBufSize > sizeof(double));
2775         
2776         if(recvHandleTable == NULL){
2777                 recvHandleTable = createHandleTable();
2778                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2779                 for(i=0;i<_Cmi_numnodes;i++){
2780                         recvHandleCount[i] = -1;
2781                 }
2782         }
2783         if(recvHandleTable[senderNode] == NULL){
2784                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2785                 recvHandleTable[senderNode]->next = NULL;               
2786         }
2787         
2788         newHandle = ++recvHandleCount[senderNode];
2789         CmiAssert(newHandle >= 0);
2790         
2791         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2792         
2793         table = recvHandleTable[senderNode];
2794         for(i=0;i<tableIdx;i++){
2795                 if(table->next ==NULL){
2796                         table->next = malloc(sizeof(infiDirectHandleTable));
2797                         table->next->next = NULL;
2798                 }
2799                 table = table->next;
2800         }
2801         table->handles[idx].id = newHandle;
2802         table->handles[idx].buf = recvBuf;
2803         table->handles[idx].size = recvBufSize;
2804 #if CMI_DIRECT_DEBUG
2805         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2806 #endif
2807         table->handles[idx].callbackFnPtr = callbackFnPtr;
2808         table->handles[idx].callbackData = callbackData;
2809         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2810         CmiAssert(table->handles[idx].key != NULL);
2811
2812 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2813         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2814         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2815         
2816         userHandle.handle = newHandle;
2817         userHandle.recverNode = _Cmi_mynode;
2818         userHandle.senderNode = senderNode;
2819         userHandle.recverBuf = recvBuf;
2820         userHandle.recverBufSize = recvBufSize;
2821         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2822         userHandle.initialValue = initialValue;
2823         
2824         table->handles[idx].userHandle = userHandle;
2825         
2826         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2827
2828   {
2829          int index = table->handles[idx].size - sizeof(double);
2830    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2831         } 
2832         
2833         addHandleToPollingQ(&(table->handles[idx]));
2834         
2835 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2836         
2837         return userHandle;
2838 }
2839
2840 /****
2841  To be called on the sender to attach the sender's buffer to this handle
2842 ******/
2843 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2844         int tableIdx,idx;
2845         int i;
2846         int handle = userHandle->handle;
2847         int recverNode  = userHandle->recverNode;
2848
2849         infiDirectHandleTable *table;
2850
2851         if(sendHandleTable == NULL){
2852                 sendHandleTable = createHandleTable();
2853         }
2854         if(sendHandleTable[recverNode] == NULL){
2855                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2856                 sendHandleTable[recverNode]->next = NULL;
2857         }
2858         
2859         CmiAssert(handle >= 0);
2860         calcHandleTableIdx(handle,&tableIdx,&idx);
2861         
2862         table = sendHandleTable[recverNode];
2863         for(i=0;i<tableIdx;i++){
2864                 if(table->next ==NULL){
2865                         table->next = malloc(sizeof(infiDirectHandleTable));
2866                         table->next->next = NULL;
2867                 }
2868                 table = table->next;
2869         }
2870
2871         table->handles[idx].id = handle;
2872         table->handles[idx].buf = sendBuf;
2873
2874         table->handles[idx].size = sendBufSize;
2875 #if CMI_DIRECT_DEBUG
2876         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2877 #endif
2878         table->handles[idx].callbackFnPtr = table->handles[idx].callbackData = NULL;
2879         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2880         CmiAssert(table->handles[idx].key != NULL);
2881         table->handles[idx].userHandle = *userHandle;
2882         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2883         
2884         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2885         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2886         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2887         
2888         
2889 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2890         table->handles[idx].packet->senderProc = _Cmi_mynode;
2891         table->handles[idx].packet->handle = handle;
2892         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2893         table->handles[idx].packet->senderBuf = sendBuf;
2894         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2895         
2896         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2897 };
2898
2899
2900
2901
2902
2903 /****
2904 To be called on the sender to do the actual data transfer
2905 ******/
2906 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2907         int handle = userHandle->handle;
2908         int recverNode = userHandle->recverNode;
2909         if(recverNode == _Cmi_mynode){
2910                 /*when the sender and receiver are on the same
2911                 processor, just look up the sender and receiver
2912                 buffers and do a memcpy*/
2913
2914                 infiDirectHandleTable *senderTable;
2915                 infiDirectHandleTable *recverTable;
2916                 
2917                 int tableIdx,idx,i;
2918
2919                 
2920                 /*find entry for this handle in sender table*/
2921                 calcHandleTableIdx(handle,&tableIdx,&idx);
2922                 CmiAssert(sendHandleTable!= NULL);
2923                 senderTable = sendHandleTable[_Cmi_mynode];
2924                 CmiAssert(senderTable != NULL);
2925                 for(i=0;i<tableIdx;i++){
2926                         senderTable = senderTable->next;
2927                 }
2928
2929                 /**find entry for this handle in recver table*/
2930                 recverTable = recvHandleTable[recverNode];
2931                 CmiAssert(recverTable != NULL);
2932                 for(i=0;i< tableIdx;i++){
2933                         recverTable = recverTable->next;
2934                 }
2935                 
2936                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
2937                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
2938 #if CMI_DIRECT_DEBUG
2939                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
2940 #endif
2941                 // The polling Q should find you and handle the callback and pollingq entry
2942                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
2943                 
2944
2945         }else{
2946                 infiPacket packet;
2947                 int tableIdx,idx;
2948                 int i;
2949                 OtherNode node;
2950                 infiDirectHandleTable *table;
2951
2952                 calcHandleTableIdx(handle,&tableIdx,&idx);
2953
2954                 table = sendHandleTable[recverNode];
2955                 CmiAssert(table != NULL);
2956                 for(i=0;i<tableIdx;i++){
2957                         table = table->next;
2958                 }
2959
2960 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
2961 #if CMI_DIRECT_DEBUG
2962                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
2963 #endif
2964
2965                 
2966                 {
2967                         
2968                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
2969                         struct ibv_sge list = {
2970                                 .addr = (uintptr_t )table->handles[idx].buf,
2971                                 .length = table->handles[idx].size,
2972                                 .lkey   = table->handles[idx].key->lkey
2973                         };
2974                         
2975                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
2976
2977                         struct ibv_send_wr *bad_wr;
2978                         struct ibv_send_wr wr = {
2979                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
2980                                 .sg_list = &list,
2981                                 .num_sge = 1,
2982                                 .opcode = IBV_WR_RDMA_WRITE,
2983                                 .send_flags = IBV_SEND_SIGNALED,
2984                                 
2985                                 .wr.rdma = {
2986                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
2987                                         .rkey = remoteKey->rkey
2988                                 }
2989                         };
2990                         /** post and rdma_read that is a rdma get*/
2991                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
2992                                 CmiAssert(0);
2993                         }
2994                 }
2995
2996         /*      MallocInfiPacket (packet);
2997                 {
2998                         packet->size = sizeof(struct infiDirectRequestPacket);
2999                         packet->buf = (char *)(table->handles[idx].packet);
3000                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3001                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3002                 }*/
3003         }
3004
3005 };
3006
3007 /**** need not be called the first time *********/
3008 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3009   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3010 }
3011
3012 /**** need not be called the first time *********/
3013 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3014         int handle = userHandle->handle;
3015         int tableIdx,idx,i;
3016         infiDirectHandleTable *table;
3017         calcHandleTableIdx(handle,&tableIdx,&idx);
3018         
3019         table = recvHandleTable[userHandle->senderNode];
3020         CmiAssert(table != NULL);
3021         for(i=0;i<tableIdx;i++){
3022                 table = table->next;
3023         }
3024 #if CMI_DIRECT_DEBUG
3025   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3026 #endif  
3027         addHandleToPollingQ(&(table->handles[idx]));
3028         
3029
3030 }
3031
3032 /**** need not be called the first time *********/
3033 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3034         int handle = userHandle->handle;
3035         int tableIdx,idx,i;
3036         infiDirectHandleTable *table;
3037         
3038         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3039
3040         calcHandleTableIdx(handle,&tableIdx,&idx);
3041         
3042         table = recvHandleTable[userHandle->senderNode];
3043         CmiAssert(table != NULL);
3044         for(i=0;i<tableIdx;i++){
3045                 table = table->next;
3046         }
3047 #if CMI_DIRECT_DEBUG
3048   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3049 #endif  
3050         addHandleToPollingQ(&(table->handles[idx]));
3051         
3052 }
3053
3054
3055 static int receivedDirectMessage(infiDirectHandle *handle){
3056 //      int index = handle->size - sizeof(double);
3057 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
3058         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3059                 return 0;
3060         }else{
3061                 (*(handle->callbackFnPtr))(handle->callbackData);       
3062                 return 1;
3063         }
3064         
3065 }
3066
3067
3068 static void pollCmiDirectQ(){
3069         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3070         while(ptr != NULL){
3071                 if(receivedDirectMessage(ptr->handle)){
3072 #if CMI_DIRECT_DEBUG
3073       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
3074 #endif
3075                         directPollingQNode *delPtr = ptr;
3076                         /** has been received and delete this node***/
3077                         if(prevPtr == NULL){
3078                                 /** first in the pollingQ**/
3079                                 if(headDirectPollingQ == tailDirectPollingQ){
3080                                         /**only node in pollingQ****/
3081                                         headDirectPollingQ = tailDirectPollingQ = NULL;
3082                                 }else{
3083                                         headDirectPollingQ = headDirectPollingQ->next;
3084                                 }
3085                         }else{
3086                                 if(ptr == tailDirectPollingQ){
3087                                         /**last node is being deleted**/
3088                                         tailDirectPollingQ = prevPtr;
3089                                 }
3090                                 prevPtr->next = ptr->next;
3091                         }
3092                         ptr = ptr->next;
3093                 //      free(delPtr);
3094                 }else{
3095                         prevPtr = ptr;
3096                         ptr = ptr->next;
3097                 }
3098         }
3099 }
3100
3101
3102 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3103         int senderProc = directRequestPacket->senderProc;
3104         int handle = directRequestPacket->handle;
3105         int tableIdx,idx,i;
3106         infiDirectHandleTable *table;
3107         OtherNode node = nodes_by_pe[senderProc];
3108
3109         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3110
3111         calcHandleTableIdx(handle,&tableIdx,&idx);
3112
3113         table = recvHandleTable[senderProc];
3114         CmiAssert(table != NULL);
3115         for(i=0;i<tableIdx;i++){
3116                 table = table->next;
3117         }
3118         
3119         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3120         
3121         {
3122                 struct ibv_sge list = {
3123                         .addr = (uintptr_t )table->handles[idx].buf,
3124                         .length = table->handles[idx].size,
3125                         .lkey   = table->handles[idx].key->lkey
3126                 };
3127
3128                 struct ibv_send_wr *bad_wr;
3129                 struct ibv_send_wr wr = {
3130                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3131                         .sg_list = &list,
3132                         .num_sge = 1,
3133                         .opcode = IBV_WR_RDMA_READ,
3134                         .send_flags = IBV_SEND_SIGNALED,
3135                         .wr.rdma = {
3136                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3137                                 .rkey = directRequestPacket->senderKey.rkey
3138                         }
3139                 };
3140 //       post and rdma_read that is a rdma get
3141                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3142                         CmiAssert(0);
3143                 }
3144         }
3145                         
3146         
3147 };*/
3148 /*
3149 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3150         MACHSTATE(3,"processDirectWC");
3151         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3152         (*(handle->callbackFnPtr))(handle->callbackData);
3153 };
3154 */
3155
3156 static void sendBarrierMessage(int pe)
3157 {
3158   /* we will only need one packet */
3159   int size=32;
3160   OtherNode  node = nodes + pe;
3161   infiPacket packet;
3162   MallocInfiPacket(packet);
3163   packet->size = size;
3164   packet->buf = CmiAlloc(size);
3165   packet->header.code = INFIBARRIERPACKET;
3166   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3167   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3168   /*  pollSendCq(0);*/
3169   EnqueuePacket(node,packet,size,key);
3170 }
3171
3172 static void recvBarrierMessage()
3173 {
3174   int i;
3175   int ne;
3176   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3177   struct ibv_wc wc[1];
3178   struct ibv_wc *recvWC;
3179   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3180   int toBuffer=1; // buffer without processing recvd messages
3181   int barrierReached=0;
3182   struct infiBuffer *buffer = NULL;
3183   struct infiPacketHeader *header = NULL;
3184   int nodeNo=-1;
3185   int len=-1;
3186   while(!barrierReached)
3187     {
3188       /* gengbin's semantic will implode if more than one q is polled at a time */
3189       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3190       //        CmiAssert(ne >=0);
3191       if(ne != 0){
3192         MACHSTATE1(3,"recvBarrier ne %d",ne);
3193       }
3194       pollSendCq(1); 
3195       for(i=0;i<ne;i++){
3196         if(wc[i].status != IBV_WC_SUCCESS){
3197           CmiAssert(0);
3198         }
3199         switch(wc[i].opcode){
3200         case IBV_WC_RECV: /* we have something to consider*/
3201           recvWC=&wc[i];
3202           buffer = (struct infiBuffer *) recvWC->wr_id; 
3203           header = (struct infiPacketHeader *)buffer->buf;
3204           nodeNo = header->nodeNo;
3205           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3206
3207           if(header->code & INFIPACKETCODE_DATA){
3208             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3209           }
3210           if(header->code & INFIDUMMYPACKET){
3211             MACHSTATE(3,"Dummy packet");
3212           }
3213           if(header->code & INFIBARRIERPACKET){
3214             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3215             // now we are done
3216             barrierReached=1;
3217             /* semantically questionable */
3218             //processAllBufferedMsgs();
3219             //return;
3220           }
3221           if(rdma && header->code & INFIRDMA_START){
3222             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3223             //          if(toBuffer){
3224             //TODO: make a function of this and use for both acks and requests
3225             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3226             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3227             *copyPacket = *rdmaPacket;
3228             copyPacket->fromNodeNo = nodeNo;
3229             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3230             context->bufferedRdmaRequests = copyPacket;
3231             copyPacket->next = tmp;
3232             copyPacket->prev = NULL;
3233             if(tmp != NULL){
3234               tmp->prev = copyPacket;
3235             }
3236             /*          }else{
3237                         processRdmaRequest(rdmaPacket,nodeNo,0);
3238                         }*/
3239           }
3240           if(rdma && header->code & INFIRDMA_ACK){
3241             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3242             processRdmaAck(rdmaPacket);
3243           }
3244           {
3245             struct ibv_sge list = {
3246               .addr     = (uintptr_t) buffer->buf,
3247               .length = buffer->size,
3248               .lkey     = buffer->key->lkey
3249             };
3250         
3251             struct ibv_recv_wr wr = {
3252               .wr_id = (uint64_t)buffer,
3253               .sg_list = &list,
3254               .num_sge = 1,
3255               .next = NULL
3256             };
3257             struct ibv_recv_wr *bad_wr;
3258         
3259             if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3260               CmiAssert(0);
3261             }
3262           }
3263
3264           break;
3265         default:
3266           CmiAbort("Wrong type of work completion object in recvq");
3267           break;
3268         }
3269       }
3270     }
3271   /* semantically questionable */
3272   //  processAllBufferedMsgs();
3273 }
3274
3275
3276 /* happen at node level */
3277 int CmiBarrier()
3278 {
3279   int len, size, i;
3280   int status;
3281   int count = 0;
3282   OtherNode node;
3283   int numnodes = CmiNumNodes();
3284   if (CmiMyRank() == 0) {
3285     /* every one send to pe 0 */
3286     if (CmiMyNode() != 0) {
3287       sendBarrierMessage(0);
3288     }
3289     /* printf("[%d] HERE\n", CmiMyPe()); */
3290     if (CmiMyNode() == 0) 
3291     {
3292       for (count = 1; count < numnodes; count ++) 
3293       {
3294         recvBarrierMessage();
3295       }
3296       /* pe 0 broadcast */
3297       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3298         int p = i;
3299         if (p > numnodes - 1) break;
3300         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3301         sendBarrierMessage(p);
3302       }
3303     }
3304     /* non 0 node waiting */
3305     if (CmiMyNode() != 0) 
3306     {
3307       recvBarrierMessage();
3308       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3309         int p = CmiMyNode();
3310         p = BROADCAST_SPANNING_FACTOR*p + i;
3311         if (p > numnodes - 1) break;
3312         p = p%numnodes;
3313         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3314         sendBarrierMessage(p);
3315       }
3316     }
3317   }
3318   CmiNodeAllBarrier();
3319   processAllBufferedMsgs();
3320   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3321 }
3322
3323 /* everyone sends a message to pe 0 and go on */
3324 int CmiBarrierZero()
3325 {
3326   int i;
3327
3328   if (CmiMyRank() == 0) {
3329     if (CmiMyNode()) {
3330       sendBarrierMessage(0);
3331     }
3332     else {
3333       for (i=0; i<CmiNumNodes()-1; i++)
3334       {
3335         recvBarrierMessage();
3336       }
3337     }
3338   }
3339   CmiNodeAllBarrier();
3340   processAllBufferedMsgs();
3341 }
3342