271252977083a597b5d5bf920517fabbec8d1944
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * Ibverbs (infiniband)  implementation of Converse NET version
4  * @ingroup NET
5  * contains only Ibverbs specific code for:
6  * - CmiMachineInit()
7  * - CmiCommunicationInit()
8  * - CmiNotifyStillIdle()
9  * - DeliverViaNetwork()
10  * - CommunicationServer()
11  * - CmiMachineExit()
12
13   created by 
14         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
15 */
16
17 /**
18  * @addtogroup NET
19  * @{
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <malloc.h>
28 #include <getopt.h>
29 #include <time.h>
30
31 #include <infiniband/verbs.h>
32
33 enum ibv_mtu mtu = IBV_MTU_2048;
34 static int page_size;
35 static int mtu_size;
36 static int packetSize;
37 static int dataSize;
38 static int rdma;
39 static int rdmaThreshold;
40 static int firstBinSize;
41 static int blockAllocRatio;
42 static int blockThreshold;
43
44
45 static int maxRecvBuffers;
46 static int maxTokens;
47 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
48 static int sendPacketPoolSize; /*total number of send buffers created*/
49
50 static double _startTime=0;
51 static int regCount;
52
53 static int pktCount;
54 static int msgCount;
55 static int minTokensLeft;
56
57
58 static double regTime;
59
60 //TODO: erase this
61 static double processLockTime;
62 static double _auxTime;
63
64 static double processBufferedTime;
65 static int processBufferedCount;
66
67 #define CMK_IBVERBS_STATS 0
68 #define CMK_IBVERBS_TOKENS_FLOW 1
69 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
70 #define CMK_IBVERBS_DEBUG 0
71 #define CMI_DIRECT_DEBUG 0
72 #define WC_LIST_SIZE 32
73 /*#define WC_BUFFER_SIZE 100*/
74
75
76
77
78 #define INCTOKENS_FRACTION 0.04
79 #define INCTOKENS_INCREASE .50
80
81 // flag for using a pool for every thread
82 #define THREAD_MULTI_POOL 0
83
84 #if THREAD_MULTI_POOL 
85 #include "pcqueue.h"
86 PCQueue **queuePool;
87 void infi_CmiFreeDirect(void *ptr);
88 static inline void fillBufferPools();
89 #endif
90
91 #define INFIBARRIERPACKET 128
92
93 struct infiIncTokenAckPacket{
94         int a;
95 };
96
97 typedef struct {
98 char none;  
99 } CmiIdleState;
100
101 //TODO:ERASE this
102 static int TESTneighbor;
103 static int TESTfrees;
104
105
106 /********
107 ** The notify idle methods
108 ***/
109
110 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
111
112 static void CmiNotifyStillIdle(CmiIdleState *s);
113
114
115 static void CmiNotifyBeginIdle(CmiIdleState *s)
116 {
117   CmiNotifyStillIdle(s);
118 }
119
120 void CmiNotifyIdle(void) {
121   CmiNotifyStillIdle(NULL);
122 }
123
124 /***************
125 Data Structures 
126 ***********************/
127
128 /******
129         This is a header attached to the beginning of every infiniband packet
130 *******/
131 #define INFIPACKETCODE_DATA 1
132 #define INFIPACKETCODE_INCTOKENS 2
133 #define INFIRDMA_START 4
134 #define INFIRDMA_ACK 8
135 #define INFIDIRECT_REQUEST 16
136 #define INFIPACKETCODE_INCTOKENSACK 32
137 #define INFIDUMMYPACKET 64
138
139 struct infiPacketHeader{
140         char code;
141         int nodeNo;
142 #if     CMK_IBVERBS_DEBUG
143         int psn;
144 #endif  
145 };
146
147 /*
148         Types of rdma packets
149 */
150 #define INFI_MESG 1 
151 #define INFI_DIRECT 2
152
153 struct infiRdmaPacket{
154         int fromNodeNo;
155         int type;
156         struct ibv_mr key;
157         struct ibv_mr *keyPtr;
158         int remoteSize;
159         char *remoteBuf;
160         void *localBuffer;
161         OutgoingMsg ogm;
162         struct infiRdmaPacket *next,*prev;
163 };
164
165
166 /** Represents a buffer that is used to receive messages
167 */
168 #define BUFFER_RECV 1
169 #define BUFFER_RDMA 2
170 struct infiBuffer{
171         int type;
172         char *buf;
173         int size;
174         struct ibv_mr *key;
175 };
176
177
178
179 /** At the moment it is a simple pool with just a list of buffers
180         * TODO; extend it to make it an element in a linklist of pools
181 */
182 struct infiBufferPool{
183         int numBuffers;
184         struct infiBuffer *buffers;
185         struct infiBufferPool *next;
186 };
187
188 /*****
189         It is the structure for the send buffers that are used
190         to send messages to other nodes
191 ********/
192
193
194 typedef struct infiPacketStruct {       
195         char *buf;
196         int size;
197         struct infiPacketHeader header;
198         struct ibv_mr *keyHeader;
199         struct OtherNodeStruct *destNode;
200         struct infiPacketStruct *next;
201         OutgoingMsg ogm;
202         struct ibv_sge elemList[2];
203         struct ibv_send_wr wr;
204 }* infiPacket;
205
206 /*
207 typedef struct infiBufferedWCStruct{
208         struct ibv_wc wcList[WC_BUFFER_SIZE];
209         int count;
210         struct infiBufferedWCStruct *next,*prev;
211 } * infiBufferedWC;
212 */
213
214 #define BCASTLIST_SIZE 50
215
216 struct infiBufferedBcastStruct{
217         char *msg;
218         int size;
219         int broot;
220         int asm_rank;
221         int valid;
222 };
223
224 typedef struct infiBufferedBcastPoolStruct{
225         struct infiBufferedBcastPoolStruct *next,*prev;
226         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
227         int count;
228
229 } *infiBufferedBcastPool;
230
231
232
233
234 /***
235         This structure represents the data needed by the infiniband
236         communication routines of a node
237         TODO: add locking for the smp version
238 */
239 struct infiContext {
240         struct ibv_context      *context;
241         
242         fd_set  asyncFds;
243         struct timeval tmo;
244         
245         int ibPort;
246 //      struct ibv_comp_channel *channel;
247         struct ibv_pd           *pd;
248         struct ibv_cq           *sendCq;
249         struct ibv_cq   *recvCq;
250         struct ibv_srq  *srq;
251         
252         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
253                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
254                                                                                                 //when the qps are stored in the corresponding OtherNodes
255
256         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
257
258         infiPacket infiPacketFreeList; 
259         
260         struct infiBufferPool *recvBufferPool;
261
262         struct infiPacketHeader header;
263
264         int srqSize;
265         int sendCqSize,recvCqSize;
266         int tokensLeft;
267
268         infiBufferedBcastPool bufferedBcastList;
269         
270         struct infiRdmaPacket *bufferedRdmaAcks;
271         
272         struct infiRdmaPacket *bufferedRdmaRequests;
273 /*      infiBufferedWC infiBufferedRecvList;*/
274         
275         int insideProcessBufferedBcasts;
276 };
277
278 static struct infiContext *context;
279
280
281
282
283 /** Represents a qp used to send messages to another node
284  There is one for each remote node
285 */
286 struct infiAddr {
287         int lid,qpn,psn;
288 };
289
290 /**
291  Stored in the OtherNode structure in machine-dgram.c 
292  Store the per node data for ibverbs layer
293 */
294 enum { INFI_HEADER_DATA=21,INFI_DATA};
295
296 struct infiOtherNodeData{
297         struct ibv_qp *qp ;
298         int state;// does it expect a packet with a header (first packet) or one without
299         int totalTokens;
300         int tokensLeft;
301         int nodeNo;
302         
303         int postedRecvs;
304         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
305 #if     CMK_IBVERBS_DEBUG       
306         int psn;
307         int recvPsn;
308 #endif  
309 };
310
311
312 /********************************
313 Memory management structures and types
314 *****************/
315
316 struct infiCmiChunkHeaderStruct;
317
318 typedef struct infiCmiChunkMetaDataStruct {
319         struct ibv_mr *key;
320         int poolIdx;
321         void *nextBuf;
322         struct infiCmiChunkHeaderStruct *owner;
323         int count;
324
325 #if THREAD_MULTI_POOL
326         int parentPe;                                           // the PE that allocated the buffer and must release it
327 #endif
328 } infiCmiChunkMetaData;
329
330
331
332
333 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
334
335 typedef struct {
336         int size;//without infiCmiChunkHeader
337         void *startBuf;
338         int count;
339 } infiCmiChunkPool;
340
341 #define INFINUMPOOLS 20
342 #define INFIMAXPERPOOL 100
343 #define INFIMULTIPOOL -5
344
345 #if THREAD_MULTI_POOL
346 infiCmiChunkPool **infiCmiChunkPools;
347 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
348 #else
349 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
350 #endif
351
352 static void initInfiCmiChunkPools();
353
354
355 static inline infiPacket newPacket(){
356         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
357         pkt->size = -1;
358         pkt->header = context->header;
359         pkt->next = NULL;
360         pkt->destNode = NULL;
361         pkt->keyHeader = METADATAFIELD(pkt)->key;
362         pkt->ogm=NULL;
363         CmiAssert(pkt->keyHeader!=NULL);
364         
365         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
366         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
367         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
368         
369         pkt->wr.wr_id = (uint64_t)pkt;
370         pkt->wr.sg_list = &(pkt->elemList[0]);
371         pkt->wr.num_sge = 2;
372         pkt->wr.opcode = IBV_WR_SEND;
373         pkt->wr.send_flags = IBV_SEND_SIGNALED;
374         pkt->wr.next = NULL;
375         
376         return pkt;
377 };
378
379 #define FreeInfiPacket(pkt){ \
380         pkt->size = -1;\
381         pkt->ogm=NULL;\
382         pkt->next = context->infiPacketFreeList; \
383         context->infiPacketFreeList = pkt; \
384 }
385
386 #define MallocInfiPacket(pkt) { \
387         infiPacket p = context->infiPacketFreeList; \
388         if(p == NULL){ p = newPacket();} \
389                  else{context->infiPacketFreeList = p->next; } \
390         pkt = p;\
391 }
392
393
394
395
396 /******************CmiMachineInit and its helper functions*/
397 static inline int pollSendCq(const int toBuffer);
398
399 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
400 static uint16_t getLocalLid(struct ibv_context *context, int port);
401 static int  checkQp(struct ibv_qp *qp){
402         struct ibv_qp_attr attr;
403         struct ibv_qp_init_attr init_attr;
404                  
405         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
406         if(attr.cur_qp_state != IBV_QPS_RTS){
407                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
408                 return 0;
409         }
410         return 1;
411 }
412 static void checkAllQps(){
413         int i;
414         for(i=0;i<_Cmi_numnodes;i++){
415                 if(i != _Cmi_mynode){
416                         if(!checkQp(nodes[i].infiData->qp)){
417                                 pollSendCq(0);
418                                 CmiAssert(0);
419                         }
420                 }
421         }
422 }
423
424 #if CMK_IBVERBS_FAST_START
425 static void send_partial_init();
426 #endif
427
428 static void CmiMachineInit(char **argv){
429         struct ibv_device **devList;
430         struct ibv_device *dev;
431         int ibPort;
432         int i;
433         int calcMaxSize;
434         infiPacket *pktPtrs;
435         struct infiRdmaPacket **rdmaPktPtrs;
436
437         MACHSTATE(3,"CmiMachineInit {");
438         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
439         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
440         
441         //TODO: make the device and ibport configureable by commandline parameter
442         //Check example for how to do that
443         devList =  ibv_get_device_list(NULL);
444         CmiAssert(devList != NULL);
445
446         dev = *devList;
447         CmiAssert(dev != NULL);
448
449         ibPort=1;
450
451         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
452
453         context = (struct infiContext *)malloc(sizeof(struct infiContext));
454         
455         MACHSTATE1(3,"context allocated %p",context);
456         
457         //localAddr will store the local addresses of all the qps
458         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
459         
460         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
461         
462         context->ibPort = ibPort;
463         //the context for this infiniband device 
464         context->context = ibv_open_device(dev);
465         CmiAssert(context->context != NULL);
466         
467         MACHSTATE1(3,"device opened %p",context->context);
468
469 /*      FD_ZERO(&context->asyncFds);
470         FD_SET(context->context->async_fd,&context->asyncFds);
471         context->tmo.tv_sec=0;
472         context->tmo.tv_usec=0;
473         
474         MACHSTATE(3,"asyncFds zeroed and set");*/
475
476         //protection domain
477         context->pd = ibv_alloc_pd(context->context);
478         CmiAssert(context->pd != NULL);
479         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
480
481   /******** At this point we know that this node is more or less serviceable
482         So, this is a good point for sending the partial init message for the fast
483         start case
484         Moreover, no work dependent on the number of nodes has started yet.
485         ************/
486
487 #if CMK_IBVERBS_FAST_START
488   send_partial_init();
489 #endif
490
491
492         context->header.nodeNo = _Cmi_mynode;
493
494         mtu_size=1200;
495         packetSize = mtu_size*4;
496         dataSize = packetSize-sizeof(struct infiPacketHeader);
497         
498         calcMaxSize=8000;
499 /*      if(_Cmi_numnodes*50 > calcMaxSize){
500                 calcMaxSize = _Cmi_numnodes*50;
501                 if(calcMaxSize > 10000){
502                         calcMaxSize = 10000;
503                 }
504         }*/
505 //      maxRecvBuffers=80;
506         maxRecvBuffers=calcMaxSize;
507         maxTokens = maxRecvBuffers;
508 //      maxTokens = 80;
509         context->tokensLeft=maxTokens;
510         //tokensPerProcessor=4;
511         if(_Cmi_numnodes > 1){
512                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
513         }
514         
515         
516         //TURN ON RDMA
517         rdma=1;
518 //      rdmaThreshold=32768;
519         rdmaThreshold=22000;
520         firstBinSize = 120;
521         CmiAssert(rdmaThreshold > firstBinSize);
522         blockAllocRatio=16;
523         blockThreshold=8;
524
525         //TODO:erase this
526         int ppn = 0;
527         CmiGetArgInt(argv,"+ppn",&CmiMyNodeSize());
528         MACHSTATE1(3,"CmiMyNodeSize %d",CmiMyNodeSize());
529         
530 #if !THREAD_MULTI_POOL
531         initInfiCmiChunkPools();
532 #endif
533
534         /*create the pool of send packets*/
535         sendPacketPoolSize = maxTokens/2;       
536         if(sendPacketPoolSize > 2000){
537                 sendPacketPoolSize = 2000;
538         }
539         
540         context->infiPacketFreeList=NULL;
541         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
542
543         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
544 #if !THREAD_MULTI_POOL
545         for(i=0;i<sendPacketPoolSize;i++){
546                 MallocInfiPacket(pktPtrs[i]);   
547         }
548
549         for(i=0;i<sendPacketPoolSize;i++){
550                 FreeInfiPacket(pktPtrs[i]);     
551         }
552         free(pktPtrs);
553 #endif
554         
555         context->bufferedBcastList=NULL;
556         context->bufferedRdmaAcks = NULL;
557         context->bufferedRdmaRequests = NULL;
558         context->insideProcessBufferedBcasts=0;
559         
560         
561         if(rdma){
562 /*              int numPkts;
563                 int k;
564                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
565                         numPkts = _Cmi_numnodes*4;
566                 }else{
567                         numPkts = maxRecvBuffers/4;
568                 }
569                 
570                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
571                 for(k=0;k<numPkts;k++){
572                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
573                 }
574                 
575                 for(k=0;k<numPkts;k++){
576                         CmiFree(rdmaPktPtrs[k]);
577                 }
578                 free(rdmaPktPtrs);*/
579         }
580         
581 /*      context->infiBufferedRecvList = NULL;*/
582 #if CMK_IBVERBS_STATS   
583         regCount =0;
584         regTime  = 0;
585
586         pktCount=0;
587         msgCount=0;
588
589         processBufferedCount=0;
590         processBufferedTime=0;
591
592         //TODO: erase this
593         processLockTime = 0;
594
595         minTokensLeft = maxTokens;
596 #endif  
597
598         
599
600         MACHSTATE(3,"} CmiMachineInit");
601 }
602
603 void CmiCommunicationInit(char **argv)
604 {
605 #if THREAD_MULTI_POOL
606         initInfiCmiChunkPools();
607         fillBufferPools();
608 #endif
609 }
610
611 /*********
612         Open a qp for every processor
613 *****/
614 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
615         int myLid;
616         int i;
617         
618         
619         //find my lid
620         myLid = getLocalLid(context->context,ibPort);
621         
622         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
623
624         context->sendCqSize = maxTokens+2;
625         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
626         CmiAssert(context->sendCq != NULL);
627         
628         MACHSTATE1(3,"sendCq created %p",context->sendCq);
629         
630         
631         context->recvCqSize = maxRecvBuffers;
632         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
633         
634         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
635         CmiAssert(context->recvCq != NULL);
636         
637         //array of queue pairs
638
639         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
640
641         if(numNodes > 1)
642         {
643                 context->srqSize = (maxRecvBuffers+2);
644                 struct ibv_srq_init_attr srqAttr = {
645                         .attr = {
646                         .max_wr  = context->srqSize,
647                         .max_sge = 1
648                         }
649                 };
650                 context->srq = ibv_create_srq(context->pd,&srqAttr);
651                 CmiAssert(context->srq != NULL);
652         
653                 struct ibv_qp_init_attr initAttr = {
654                         .qp_type = IBV_QPT_RC,
655                         .send_cq = context->sendCq,
656                         .recv_cq = context->recvCq,
657                         .srq             = context->srq,
658                         .sq_sig_all = 0,
659                         .qp_context = NULL,
660                         .cap     = {
661                                 .max_send_wr  = maxTokens,
662                                 .max_send_sge = 2,
663                         },
664                 };
665                 struct ibv_qp_attr attr;
666
667                 attr.qp_state        = IBV_QPS_INIT;
668                 attr.pkey_index      = 0;
669                 attr.port_num        = ibPort;
670                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
671
672 /*              MACHSTATE1(3,"context->pd %p",context->pd);
673                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
674                 MACHSTATE1(3,"TEST QP %p",qp);*/
675
676                 for( i=0;i<numNodes;i++){
677                         if(i == myNode){
678                         }else{
679                                 localAddr[i].lid = myLid;
680                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
681                         
682                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
683                         
684                                 CmiAssert(context->qp[i] != NULL);
685                         
686                         
687                                 ibv_modify_qp(context->qp[i], &attr,
688                                           IBV_QP_STATE              |
689                                           IBV_QP_PKEY_INDEX         |
690                                         IBV_QP_PORT               |
691                                         IBV_QP_ACCESS_FLAGS);           
692
693                                 localAddr[i].qpn = context->qp[i]->qp_num;
694                                 localAddr[i].psn = lrand48() & 0xffffff;
695                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
696                         }
697                 }
698         }
699         MACHSTATE(3,"qps created");
700 };
701
702 void copyInfiAddr(ChInfiAddr *qpList){
703         int qpListIdx=0;
704         int i;
705         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
706         for(i=0;i<_Cmi_numnodes;i++){
707                 if(i == _Cmi_mynode){
708                 }else{
709                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
710                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
711                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
712                         qpListIdx++;
713                 }
714         }
715 }
716
717
718 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
719         struct ibv_port_attr attr;
720
721         if (ibv_query_port(dev_context, port, &attr))
722                 return 0;
723
724         return attr.lid;
725 }
726
727 /**************** END OF CmiMachineInit and its helper functions*/
728
729 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
730 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
731
732 /* Initial the infiniband specific data for a remote node
733         1. connect the qp and store it in and return it
734 **/
735 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
736         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
737         int err;
738         ret->state = INFI_HEADER_DATA;
739         ret->qp = context->qp[node];
740 //      ret->totalTokens = tokensPerProcessor;
741 //      ret->tokensLeft = tokensPerProcessor;
742         ret->nodeNo = node;
743 //      ret->postedRecvs = tokensPerProcessor;
744 #if     CMK_IBVERBS_DEBUG       
745         ret->psn = 0;
746         ret->recvPsn = 0;
747 #endif
748         
749         struct ibv_qp_attr attr = {
750                 .qp_state               = IBV_QPS_RTR,
751                 .path_mtu               = mtu,
752                 .dest_qp_num            = addr[1],
753                 .rq_psn                 = addr[2],
754                 .max_dest_rd_atomic     = 1,
755                 .min_rnr_timer          = 31,
756                 .ah_attr                = {
757                         .is_global      = 0,
758                         .dlid           = addr[0],
759                         .sl             = 0,
760                         .src_path_bits  = 0,
761                         .port_num       = context->ibPort
762                 }
763         };
764         
765         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
766         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
767         
768         if (err = ibv_modify_qp(ret->qp, &attr,
769           IBV_QP_STATE              |
770           IBV_QP_AV                 |
771           IBV_QP_PATH_MTU           |
772           IBV_QP_DEST_QPN           |
773           IBV_QP_RQ_PSN             |
774           IBV_QP_MAX_DEST_RD_ATOMIC |
775           IBV_QP_MIN_RNR_TIMER)) {
776                         MACHSTATE1(3,"ERROR %d",err);
777                         CmiAbort("failed to change qp state to RTR");
778         }
779
780         MACHSTATE(3,"qp state changed to RTR");
781         
782         attr.qp_state       = IBV_QPS_RTS;
783         attr.timeout        = 26;
784         attr.retry_cnt      = 20;
785         attr.rnr_retry      = 7;
786         attr.sq_psn         = context->localAddr[node].psn;
787         attr.max_rd_atomic  = 1;
788
789         
790         if (ibv_modify_qp(ret->qp, &attr,
791           IBV_QP_STATE              |
792           IBV_QP_TIMEOUT            |
793           IBV_QP_RETRY_CNT          |
794           IBV_QP_RNR_RETRY          |
795           IBV_QP_SQ_PSN             |
796           IBV_QP_MAX_QP_RD_ATOMIC)) {
797                         fprintf(stderr, "Failed to modify QP to RTS\n");
798                         exit(1);
799         }
800         MACHSTATE(3,"qp state changed to RTS");
801
802         MACHSTATE(3,"} initInfiOtherNodeData");
803         return ret;
804 }
805
806
807 void    infiPostInitialRecvs(){
808         //create the pool and post the receives
809         int numPosts;
810 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
811                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
812         }else{
813                 numPosts = maxRecvBuffers;
814         }*/
815
816         if(_Cmi_numnodes > 1){
817                 numPosts = maxRecvBuffers;
818         }else{
819                 numPosts = 0;
820         }
821         if(numPosts > 0){
822                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
823                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
824         }
825
826
827         free(context->qp);
828         context->qp = NULL;
829         free(context->localAddr);
830         context->localAddr= NULL;
831 }
832
833 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
834         int numBuffers;
835         int i;
836         int bigSize;
837         char *bigBuf;
838         struct infiBufferPool *ret;
839         struct ibv_mr *bigKey;
840
841         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
842
843         page_size = sysconf(_SC_PAGESIZE);
844         ret = malloc(sizeof(struct infiBufferPool));
845         ret->next = NULL;
846         numBuffers=ret->numBuffers = numRecvs;
847         
848         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
849         
850         bigSize = numBuffers*sizePerBuffer;
851         bigBuf=malloc(bigSize);
852         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
853         CmiAssert(bigKey != NULL);
854         
855         for(i=0;i<numBuffers;i++){
856                 struct infiBuffer *buffer =  &(ret->buffers[i]);
857                 buffer->type = BUFFER_RECV;
858                 buffer->size = sizePerBuffer;
859                 
860                 /*buffer->buf = malloc(sizePerBuffer);
861                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
862                 buffer->buf = &bigBuf[i*sizePerBuffer];
863                 buffer->key = bigKey;
864
865                 if(buffer->key == NULL){
866                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
867                         CmiAssert(buffer->key != NULL);
868                 }
869         }
870         return ret;
871 };
872
873
874
875 /**
876          Post the buffers as recv work requests
877 */
878 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
879         int j,err;
880         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
881         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
882         struct ibv_recv_wr *bad_wr;
883         
884         int startBufferIdx=0;
885         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
886         for(j=0;j<numRecvs;j++){
887                 
888                 
889                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
890                 sgElements[j].length = sizePerBuffer;
891                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
892                 
893                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
894                 workRequests[j].sg_list = &sgElements[j];
895                 workRequests[j].num_sge = 1;
896                 if(j != numRecvs-1){
897                         workRequests[j].next = &workRequests[j+1];
898                 }
899                 
900         }
901         workRequests[numRecvs-1].next = NULL;
902         MACHSTATE(3,"About to call ibv_post_srq_recv");
903         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
904                 CmiAssert(0);
905         }
906
907         free(workRequests);
908         free(sgElements);
909 }
910
911
912
913
914 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
915
916 static void CmiMachineExit()
917 {
918 #if CMK_IBVERBS_STATS   
919         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
920 #endif
921
922 #if THREAD_MULTI_POOL
923         printf("IBVERBS Multi pool version\n");
924 #endif
925
926         //TODO: erase this
927         printf("Alloc time = %.8f  Lock time = %.8f count = %d\n",processBufferedTime/processBufferedCount,processLockTime/processBufferedCount,processBufferedCount);
928
929         //TODO: ERASE this
930         MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
931 }
932 static void ServiceCharmrun_nolock();
933
934 static void CmiNotifyStillIdle(CmiIdleState *s) {
935 #if CMK_SMP
936         CmiCommLock();
937 /*      if(where == COMM_SERVER_FROM_SMP)*/
938 #endif
939 /*              ServiceCharmrun_nolock();*/
940
941         CommunicationServer_nolock(0);
942 #if CMK_SMP
943         CmiCommUnlock();
944 #endif
945 }
946
947 static inline void increaseTokens(OtherNode node);
948
949 static inline int pollRecvCq(const int toBuffer);
950 static inline int pollSendCq(const int toBuffer);
951
952
953 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
954 #if !CMK_IBVERBS_TOKENS_FLOW
955         return;
956 #else
957         //if(infiData->tokensLeft == 0){
958         if(context->tokensLeft == 0){
959                 MACHSTATE(3,"GET FREE TOKENS {{{");
960         }else{
961                 return;
962         }
963         while(context->tokensLeft == 0){
964                 CommunicationServer_nolock(1); 
965         }
966         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
967 #endif
968 }
969
970
971 /**
972         Packetize this data and send it
973 **/
974
975
976
977 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
978         int incTokens=0;
979         int retval;
980 #if     CMK_IBVERBS_DEBUG
981         packet->header.psn = (++node->infiData->psn);
982 #endif  
983
984
985
986         packet->elemList[1].addr = (uintptr_t)packet->buf;
987         packet->elemList[1].length = size;
988         packet->elemList[1].lkey = dataKey->lkey;
989         
990         
991         packet->destNode = node;
992         
993 #if CMK_IBVERBS_STATS   
994         pktCount++;
995 #endif  
996         
997         getFreeTokens(node->infiData);
998
999 #if CMK_IBVERBS_INCTOKENS       
1000         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
1001                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
1002                 incTokens=1;
1003         }
1004 #endif
1005 /*
1006         if(!checkQp(node->infiData->qp)){
1007                 pollSendCq(1);
1008                 CmiAssert(0);
1009         }*/
1010
1011         struct ibv_send_wr *bad_wr=NULL;
1012         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1013                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
1014                 CmiAssert(0);
1015         }
1016 #if     CMK_IBVERBS_TOKENS_FLOW
1017         context->tokensLeft--;
1018 #if     CMK_IBVERBS_STATS
1019         if(context->tokensLeft < minTokensLeft){
1020                 minTokensLeft = context->tokensLeft;
1021         }
1022 #endif
1023 #endif
1024
1025 /*      if(!checkQp(node->infiData->qp)){
1026                 pollSendCq(1);
1027                 CmiAssert(0);
1028         }*/
1029
1030 #if CMK_IBVERBS_INCTOKENS       
1031         if(incTokens){
1032                 increaseTokens(node);
1033         }
1034 #endif
1035
1036
1037 #if     CMK_IBVERBS_DEBUG
1038         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1039 #else
1040         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1041 #endif
1042
1043 };
1044
1045
1046 static void inline EnqueueDummyPacket(OtherNode node,int size){
1047         infiPacket packet;
1048         MallocInfiPacket(packet);
1049         packet->size = size;
1050         packet->buf = CmiAlloc(size);
1051         
1052         packet->header.code = INFIDUMMYPACKET;
1053
1054         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1055         
1056         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1057         EnqueuePacket(node,packet,size,key);
1058 }
1059
1060
1061
1062
1063
1064
1065 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1066         infiPacket packet;
1067         MallocInfiPacket(packet);
1068         packet->size = size;
1069         packet->buf=data;
1070         
1071         //the nodeNo is added at time of packet allocation
1072         packet->header.code = INFIPACKETCODE_DATA;
1073         
1074         ogm->refcount++;
1075         packet->ogm = ogm;
1076         
1077         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1078         CmiAssert(key != NULL);
1079         
1080         EnqueuePacket(node,packet,size,key);
1081 };
1082
1083 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1084 static inline void processAllBufferedMsgs();
1085
1086 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1087         int size; char *data;
1088 //      processAllBufferedMsgs();
1089
1090         
1091         ogm->refcount++;
1092   size = ogm->size;
1093   data = ogm->data;
1094
1095 #if CMK_IBVERBS_STATS   
1096         msgCount++;
1097 #endif
1098
1099         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1100         //First packet has dgram header, other packets dont
1101         
1102   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1103         
1104         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1105
1106         if(rdma && size > rdmaThreshold){
1107                         EnqueueRdmaPacket(ogm,node);
1108         }else{
1109         
1110                 while(size > dataSize){
1111                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1112                         size -= dataSize;
1113                         data += dataSize;
1114                 }
1115                 if(size > 0){
1116                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1117                 }
1118         }
1119 //#if   !CMK_SMP
1120         processAllBufferedMsgs();
1121 //#endif
1122         ogm->refcount--;
1123         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1124 }
1125
1126
1127 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1128         infiPacket packet;
1129
1130         ogm->refcount++;
1131         
1132         MallocInfiPacket(packet);
1133  
1134  {
1135                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1136
1137                 
1138                 packet->size = sizeof(struct infiRdmaPacket);
1139                 packet->buf = (char *)rdmaPacket;
1140                 
1141 /*              struct ibv_mr *key = ibv_reg_mr(context->pd,ogm->data,ogm->size,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE);*/
1142                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1143                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1144                 
1145                 packet->header.code = INFIRDMA_START;
1146                 packet->header.nodeNo = _Cmi_mynode;
1147                 packet->ogm = NULL;
1148
1149                 rdmaPacket->type = INFI_MESG;
1150                 rdmaPacket->ogm = ogm;
1151                 rdmaPacket->key = *key;
1152                 rdmaPacket->keyPtr = key;
1153                 rdmaPacket->remoteBuf = ogm->data;
1154                 rdmaPacket->remoteSize = ogm->size;
1155                 
1156                 
1157                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1158                 
1159                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1160                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1161         }
1162 }
1163
1164
1165
1166 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1167 static inline void processSendWC(struct ibv_wc *sendWC);
1168 static unsigned int _count=0;
1169 extern int errno;
1170 static int _countAsync=0;
1171 static inline void processAsyncEvents(){
1172         struct ibv_async_event event;
1173         int ready;
1174         _countAsync++;
1175         if(_countAsync < 1){
1176                 return;
1177         }
1178         _countAsync=0;
1179         FD_SET(context->context->async_fd,&context->asyncFds);
1180         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1181         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1182         
1183         if(ready==0){
1184                 return;
1185         }
1186         if(ready == -1){
1187 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1188                 return;
1189         }
1190         
1191         if (ibv_get_async_event(context->context, &event)){
1192                 return;
1193                 CmiAbort("get async event failed");
1194         }
1195         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1196         ibv_ack_async_event(&event);
1197
1198         
1199 }
1200
1201 static void pollCmiDirectQ();
1202
1203 static inline  void CommunicationServer_nolock(int toBuffer) {
1204         int processed;
1205         if(_Cmi_numnodes <= 1){
1206                 pollCmiDirectQ();
1207                 return;
1208         }
1209         MACHSTATE(2,"CommServer_nolock{");
1210         
1211 //      processAsyncEvents();
1212         
1213 //      checkAllQps();
1214
1215         pollCmiDirectQ();
1216
1217         processed = pollRecvCq(toBuffer);
1218         
1219
1220         processed += pollSendCq(toBuffer);
1221         
1222         if(toBuffer == 0){
1223 //              if(processed != 0)
1224                         processAllBufferedMsgs();
1225         }
1226         
1227 //      checkAllQps();
1228 //      _count--;
1229         MACHSTATE(2,"} CommServer_nolock ne");
1230         
1231 }
1232 /*
1233 static inline infiBufferedWC createInfiBufferedWC(){
1234         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1235         ret->count = 0;
1236         ret->next = ret->prev =NULL;
1237         return ret;
1238 }*/
1239
1240 /****
1241         The buffered recvWC are stored in a doubly linked list of 
1242         arrays or blocks of wcs.
1243         To keep the average insert cost low, a new block is added 
1244         to the top of the list. (resulting in a reverse seq of blocks)
1245         Within a block however wc are stored in a sequence
1246 *****/
1247 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1248         infiBufferedWC block;
1249         MACHSTATE(3,"Insert Buffered Recv called");
1250         if( context->infiBufferedRecvList ==NULL){
1251                 context->infiBufferedRecvList = createInfiBufferedWC();
1252                 block = context->infiBufferedRecvList;
1253         }else{
1254                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1255                         block = createInfiBufferedWC();
1256                         context->infiBufferedRecvList->prev = block;
1257                         block->next = context->infiBufferedRecvList;
1258                         context->infiBufferedRecvList = block;
1259                 }else{
1260                         block = context->infiBufferedRecvList;
1261                 }
1262         }
1263         
1264         block->wcList[block->count] = *wc;
1265         block->count++;
1266 };
1267 */
1268
1269
1270 /********
1271 go through the blocks of bufferedWC. Process the last block first.
1272 Then the next one and so on. (Processing within a block should happen
1273 in sequence).
1274 Leave the last block in place to avoid having to allocate again
1275 ******/
1276 /*static inline void processBufferedRecvList(){
1277         infiBufferedWC start;
1278         start = context->infiBufferedRecvList;
1279         while(start->next != NULL){
1280                 start = start->next;
1281         }
1282         while(start != NULL){
1283                 int i=0;
1284                 infiBufferedWC tmp;
1285                 for(i=0;i<start->count;i++){
1286                         processRecvWC(&start->wcList[i]);
1287                 }
1288                 if(start != context->infiBufferedRecvList){
1289                         //not the first one
1290                         tmp = start;
1291                         start = start->prev;
1292                         free(tmp);
1293                         start->next = NULL;
1294                 }else{
1295                         start = start->prev;
1296                 }
1297         }
1298         context->infiBufferedRecvList->next = NULL;
1299         context->infiBufferedRecvList->prev = NULL;
1300         context->infiBufferedRecvList->count = 0;
1301 }
1302 */
1303
1304 static inline int pollRecvCq(const int toBuffer){
1305         int i;
1306         int ne;
1307         struct ibv_wc wc[WC_LIST_SIZE];
1308         
1309         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1310         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1311 //      CmiAssert(ne >=0);
1312         
1313         if(ne != 0){
1314                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1315         }
1316         
1317         for(i=0;i<ne;i++){
1318                 if(wc[i].status != IBV_WC_SUCCESS){
1319                         CmiAssert(0);
1320                 }
1321                 switch(wc[i].opcode){
1322                         case IBV_WC_RECV:
1323                                         processRecvWC(&wc[i],toBuffer);
1324                                 break;
1325                         default:
1326                                 CmiAbort("Wrong type of work completion object in recvq");
1327                                 break;
1328                 }
1329                         
1330         }
1331         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1332         return ne;
1333
1334 }
1335
1336 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1337
1338 static inline int pollSendCq(const int toBuffer){
1339         int i;
1340         int ne;
1341         struct ibv_wc wc[WC_LIST_SIZE];
1342
1343         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1344 //      CmiAssert(ne >=0);
1345         
1346         
1347         for(i=0;i<ne;i++){
1348                 if(wc[i].status != IBV_WC_SUCCESS){
1349                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1350 #if CMK_IBVERBS_STATS
1351         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1352 #endif
1353                         CmiAssert(0);
1354                 }
1355                 switch(wc[i].opcode){
1356                         case IBV_WC_SEND:{
1357                                 //message received
1358                                 processSendWC(&wc[i]);
1359                                 
1360                                 break;
1361                                 }
1362                         case IBV_WC_RDMA_READ:
1363                         {
1364 //                              processRdmaWC(&wc[i],toBuffer);
1365                                         processRdmaWC(&wc[i],1);
1366                                 break;
1367                         }
1368                         case IBV_WC_RDMA_WRITE:
1369                         {
1370                                 /*** used for CmiDirect puts 
1371                                 Nothing needs to be done on the sender side once send is done **/
1372                                 break;
1373                         }
1374                         default:
1375                                 CmiAbort("Wrong type of work completion object in recvq");
1376                                 break;
1377                 }
1378                         
1379         }
1380         return ne;
1381 }
1382
1383
1384 /******************
1385 Check the communication server socket and
1386
1387 *****************/
1388 int CheckSocketsReady(int withDelayMs)
1389 {   
1390   int nreadable;
1391   CMK_PIPE_DECL(withDelayMs);
1392
1393   CmiStdoutAdd(CMK_PIPE_SUB);
1394   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1395
1396   nreadable=CMK_PIPE_CALL();
1397   ctrlskt_ready_read = 0;
1398   dataskt_ready_read = 0;
1399   dataskt_ready_write = 0;
1400   
1401   if (nreadable == 0) {
1402     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1403     return nreadable;
1404   }
1405   if (nreadable==-1) {
1406     CMK_PIPE_CHECKERR();
1407     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1408     return CheckSocketsReady(0);
1409   }
1410   CmiStdoutCheck(CMK_PIPE_SUB);
1411   if (Cmi_charmrun_fd!=-1) 
1412           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1413   MACHSTATE(1,"} CheckSocketsReady")
1414   return nreadable;
1415 }
1416
1417
1418 /*** Service the charmrun socket
1419 *************/
1420
1421 static void ServiceCharmrun_nolock()
1422 {
1423   int again = 1;
1424   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1425   while (again)
1426   {
1427   again = 0;
1428   CheckSocketsReady(0);
1429   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1430   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1431   }
1432   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1433 }
1434
1435
1436
1437 static void CommunicationServer(int sleepTime, int where){
1438         if( where == COMM_SERVER_FROM_INTERRUPT){
1439                 return;
1440         }
1441 #if CMK_SMP
1442         if(where == COMM_SERVER_FROM_WORKER){
1443                 return;
1444         }
1445         if(where == COMM_SERVER_FROM_SMP){
1446 #endif
1447                 ServiceCharmrun_nolock();
1448 #if CMK_SMP
1449         }
1450         CmiCommLock();
1451 #endif
1452         CommunicationServer_nolock(0);
1453 #if CMK_SMP
1454         CmiCommUnlock();
1455 #endif
1456 }
1457
1458
1459 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1460
1461
1462 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1463
1464 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1465         char *newmsg;
1466         
1467         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1468         
1469         OtherNode node = &nodes[nodeNo];
1470         newmsg = node->asm_msg;         
1471         
1472         /// This simple state machine determines if this packet marks the beginning of a new message
1473         // from another node, or if this is another in a sequence of packets
1474         switch(node->infiData->state){
1475                 case INFI_HEADER_DATA:
1476                 {
1477                         int size;
1478                         int rank, srcpe, seqno, magic, i;
1479                         unsigned int broot;
1480                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1481                         size = CmiMsgHeaderGetLength(msg);
1482                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1483 //                      CmiAssert(size > 0);
1484 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1485                         
1486 //                      CmiAssert(newmsg == NULL);
1487                         if(len > size){
1488                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1489                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1490                         }
1491                         newmsg = (char *)CmiAlloc(size);
1492       _MEMCHECK(newmsg);
1493       memcpy(newmsg, msg, len);
1494       node->asm_rank = rank;
1495       node->asm_total = size;
1496       node->asm_fill = len;
1497       node->asm_msg = newmsg;
1498                         node->infiData->broot = broot;
1499                         if(len == size){
1500                                 //this is the only packet for this message 
1501                                 node->infiData->state = INFI_HEADER_DATA;
1502                         }else{
1503                                 //there are more packets following
1504                                 node->infiData->state = INFI_DATA;
1505                         }
1506                         break;
1507                 }
1508                 case INFI_DATA:
1509                 {
1510                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1511                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1512                                 CmiAbort("packet in the middle does not have expected length");
1513                         }
1514                         if(node->asm_fill+len > node->asm_total){
1515                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1516                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1517                         }
1518                         //tODO: remove this
1519                         memcpy(newmsg + node->asm_fill,msg,len);
1520                         node->asm_fill += len;
1521                         if(node->asm_fill == node->asm_total){
1522                                 node->infiData->state = INFI_HEADER_DATA;
1523                         }else{
1524                                 node->infiData->state = INFI_DATA;
1525                         }
1526                         break;
1527                 }
1528         }
1529         /// if this packet was the last packet in a message ie state was 
1530         /// reset to infi_header_data
1531         
1532         if(node->infiData->state == INFI_HEADER_DATA){
1533                 int total_size = node->asm_total;
1534                 node->asm_msg = NULL;
1535                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1536                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1537         }
1538         
1539 };
1540
1541 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1542 #if CMK_BROADCAST_SPANNING_TREE
1543         if (rank == DGRAM_BROADCAST
1544 #if CMK_NODE_QUEUE_AVAILABLE
1545           || rank == DGRAM_NODEBROADCAST
1546 #endif
1547          ){
1548                                          if(toBuffer){
1549                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1550                                                 }else{
1551                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1552                                                 }
1553                                         }
1554 #elif CMK_BROADCAST_HYPERCUBE
1555         if (rank == DGRAM_BROADCAST
1556 #if CMK_NODE_QUEUE_AVAILABLE
1557           || rank == DGRAM_NODEBROADCAST
1558 #endif
1559          ){
1560                                          if(toBuffer){
1561                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1562                                          }else{
1563                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1564                                                 }
1565                                         }
1566 #endif
1567
1568
1569                 
1570                 switch (rank) {
1571         case DGRAM_BROADCAST: {
1572                                 int i;
1573                                 for (i=1; i<_Cmi_mynodesize; i++){
1574                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1575                                 }
1576           CmiPushPE(0, newmsg);
1577           break;
1578       }
1579 #if CMK_NODE_QUEUE_AVAILABLE
1580         case DGRAM_NODEBROADCAST: 
1581         case DGRAM_NODEMESSAGE: {
1582           CmiPushNode(newmsg);
1583           break;
1584         }
1585 #endif
1586         default:
1587                                 {
1588                                         
1589           CmiPushPE(rank, newmsg);
1590                                 }
1591         }    /* end of switch */
1592                 if(!toBuffer){
1593 //#if !CMK_SMP          
1594                 processAllBufferedMsgs();
1595 //#endif
1596                 }
1597 }
1598
1599
1600 static inline void increasePostedRecvs(int nodeNo);
1601 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1602 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1603
1604 //struct infiDirectRequestPacket;
1605 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1606
1607 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1608         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1609         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1610         int nodeNo = header->nodeNo;
1611 #if     CMK_IBVERBS_DEBUG
1612         OtherNode node = &nodes[nodeNo];
1613 #endif
1614         
1615         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1616 #if     CMK_IBVERBS_DEBUG
1617         if(node->infiData->recvPsn == 0){
1618                 node->infiData->recvPsn = header->psn;
1619         }else{
1620                 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1621                 node->infiData->recvPsn++;
1622         }
1623         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1624 #else
1625         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1626 #endif
1627         
1628         if(header->code & INFIPACKETCODE_DATA){
1629                         
1630                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1631         }
1632         if(header->code & INFIDUMMYPACKET){
1633                 MACHSTATE(3,"Dummy packet");
1634         }
1635         if(header->code & INFIBARRIERPACKET){
1636                 MACHSTATE(3,"Barrier packet");
1637                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1638         }
1639
1640 #if CMK_IBVERBS_INCTOKENS       
1641         if(header->code & INFIPACKETCODE_INCTOKENS){
1642                 increasePostedRecvs(nodeNo);
1643         }
1644 #endif  
1645         if(rdma && header->code & INFIRDMA_START){
1646                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1647 //              if(toBuffer){
1648                         //TODO: make a function of this and use for both acks and requests
1649                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1650                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1651                         *copyPacket = *rdmaPacket;
1652                         copyPacket->fromNodeNo = nodeNo;
1653                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1654                         context->bufferedRdmaRequests = copyPacket;
1655                         copyPacket->next = tmp;
1656                         copyPacket->prev = NULL;
1657                         if(tmp != NULL){
1658                                 tmp->prev = copyPacket;
1659                         }
1660 /*              }else{
1661                         processRdmaRequest(rdmaPacket,nodeNo,0);
1662                 }*/
1663         }
1664         if(rdma && header->code & INFIRDMA_ACK){
1665                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1666                 processRdmaAck(rdmaPacket);
1667         }
1668 /*      if(header->code & INFIDIRECT_REQUEST){
1669                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1670                 processDirectRequest(directRequestPacket);
1671         }*/
1672         {
1673                 struct ibv_sge list = {
1674                         .addr   = (uintptr_t) buffer->buf,
1675                         .length = buffer->size,
1676                         .lkey   = buffer->key->lkey
1677                 };
1678         
1679                 struct ibv_recv_wr wr = {
1680                         .wr_id = (uint64_t)buffer,
1681                         .sg_list = &list,
1682                         .num_sge = 1,
1683                         .next = NULL
1684                 };
1685                 struct ibv_recv_wr *bad_wr;
1686         
1687                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1688                         CmiAssert(0);
1689                 }
1690         }
1691
1692 };
1693
1694
1695
1696
1697 static inline  void processSendWC(struct ibv_wc *sendWC){
1698
1699         infiPacket packet = (infiPacket )sendWC->wr_id;
1700 #if CMK_IBVERBS_TOKENS_FLOW
1701 //      packet->destNode->infiData->tokensLeft++;
1702         context->tokensLeft++;
1703 #endif
1704
1705         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1706         if(packet->ogm != NULL){
1707                 packet->ogm->refcount--;
1708                 if(packet->ogm->refcount == 0){
1709                         GarbageCollectMsg(packet->ogm); 
1710                 }
1711         }else{
1712                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1713
1714                 }
1715         }
1716
1717         FreeInfiPacket(packet);
1718 };
1719
1720
1721
1722 /********************************************************************/
1723 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1724         int nodeNo = fromNodeNo;
1725         OtherNode node = &nodes[nodeNo];
1726         struct infiRdmaPacket *rdmaPacket;
1727
1728         getFreeTokens(node->infiData);
1729 #if CMK_IBVERBS_TOKENS_FLOW
1730 //      node->infiData->tokensLeft--;
1731         context->tokensLeft--;
1732 #if     CMK_IBVERBS_STATS
1733         if(context->tokensLeft < minTokensLeft){
1734                 minTokensLeft = context->tokensLeft;
1735         }
1736 #endif
1737 #endif
1738         
1739         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1740 //      CmiAssert(buffer != NULL);
1741         
1742         
1743         if(isBuffered){
1744                 rdmaPacket = _rdmaPacket;
1745         }else{
1746                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1747                 *rdmaPacket = *_rdmaPacket;
1748         }
1749
1750
1751         rdmaPacket->fromNodeNo = fromNodeNo;
1752         rdmaPacket->localBuffer = (void *)buffer;
1753         
1754         buffer->type = BUFFER_RDMA;
1755         buffer->size = rdmaPacket->remoteSize;
1756         
1757         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1758 //      CmiAssert(buffer->buf != NULL);
1759
1760         buffer->key = METADATAFIELD(buffer->buf)->key;
1761
1762         
1763         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1764         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1765 //      CmiAssert(buffer->key != NULL);
1766         
1767         {
1768                 struct ibv_sge list = {
1769                         .addr = (uintptr_t )buffer->buf,
1770                         .length = buffer->size,
1771                         .lkey   = buffer->key->lkey
1772                 };
1773
1774                 struct ibv_send_wr *bad_wr;
1775                 struct ibv_send_wr wr = {
1776                         .wr_id = (uint64_t )rdmaPacket,
1777                         .sg_list = &list,
1778                         .num_sge = 1,
1779                         .opcode = IBV_WR_RDMA_READ,
1780                         .send_flags = IBV_SEND_SIGNALED,
1781                         .wr.rdma = {
1782                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1783                                 .rkey = rdmaPacket->key.rkey
1784                         }
1785                 };
1786                 /** post and rdma_read that is a rdma get*/
1787                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1788                         CmiAssert(0);
1789                 }
1790         }
1791
1792 };
1793
1794 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1795 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1796
1797 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1798                 //rdma get done
1799 #if CMK_IBVERBS_STATS
1800         double _startRegTime;
1801 #endif  
1802
1803         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1804 /*      if(rdmaPacket->type == INFI_DIRECT){
1805                 processDirectWC(rdmaPacket);
1806                 return;
1807         }*/
1808 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1809         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1810
1811         /*TODO: remove this
1812         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1813         
1814 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1815         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1816         
1817         {
1818                 int size;
1819                 int rank, srcpe, seqno, magic, i;
1820                 unsigned int broot;
1821                 char *msg = buffer->buf;
1822                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1823                 size = CmiMsgHeaderGetLength(msg);
1824 /*              CmiAssert(size == buffer->size);*/
1825                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1826         }
1827         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1828
1829         
1830         free(buffer);
1831         
1832         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1833         //we are sending this ack as a response to a successful
1834         // rdma_Read.. the token for that rdma_Read needs to be freed
1835 #if CMK_IBVERBS_TOKENS_FLOW     
1836         //node->infiData->tokensLeft++;
1837         context->tokensLeft++;
1838 #endif
1839
1840         //send ack to sender if toBuffer is off otherwise buffer it
1841         if(toBuffer){
1842                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1843                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1844                 context->bufferedRdmaAcks = rdmaPacket;
1845                 rdmaPacket->next = tmp;
1846                 rdmaPacket->prev = NULL;
1847                 if(tmp != NULL){
1848                         tmp->prev = rdmaPacket; 
1849                 }
1850         }else{
1851                 EnqueueRdmaAck(rdmaPacket);             
1852                 free(rdmaPacket);
1853         }
1854 }
1855
1856 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1857         infiPacket packet;
1858         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1859
1860         
1861         MallocInfiPacket(packet);
1862         {
1863                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1864                 *ackPacket = *rdmaPacket;
1865                 packet->size = sizeof(struct infiRdmaPacket);
1866                 packet->buf = (char *)ackPacket;
1867                 packet->header.code = INFIRDMA_ACK;
1868                 packet->ogm=NULL;
1869                 
1870                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1871         
1872
1873                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1874         }
1875 };
1876
1877
1878 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1879         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1880         rdmaPacket->ogm->refcount--;
1881         GarbageCollectMsg(rdmaPacket->ogm);
1882 }
1883
1884
1885 /****************************
1886  Deal with all the buffered (delayed) messages
1887  such as processing recvd broadcasts, sending
1888  rdma acks and processing recvd rdma requests
1889 ******************************/
1890
1891
1892 static inline infiBufferedBcastPool createBcastPool(){
1893         int i;
1894         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1895         ret->count = 0;
1896         ret->next = ret->prev = NULL;   
1897         for(i=0;i<BCASTLIST_SIZE;i++){
1898                 ret->bcastList[i].valid = 0;
1899         }
1900         return ret;
1901 };
1902 /****
1903         The buffered bcast messages are stored in a doubly linked list of 
1904         arrays or blocks.
1905         To keep the average insert cost low, a new block is added 
1906         to the top of the list. (resulting in a reverse seq of blocks)
1907         Within a block however bcast are stored in increasing order sequence
1908 *****/
1909
1910 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1911         if(context->bufferedBcastList == NULL){
1912                 context->bufferedBcastList = createBcastPool();
1913         }else{
1914                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1915                         infiBufferedBcastPool tmp;
1916                         tmp = createBcastPool();
1917                         context->bufferedBcastList->prev = tmp;
1918                         tmp->next = context->bufferedBcastList;
1919                         context->bufferedBcastList = tmp;
1920                 }
1921         }
1922         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1923         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1924         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1925         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1926         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1927         
1928         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1929         
1930         context->bufferedBcastList->count++;
1931 }
1932
1933 /*********
1934         Go through the blocks of buffered bcast messages. process last block first
1935         processign within a block is in sequence though
1936 *********/
1937 static inline void processBufferedBcast(){
1938         infiBufferedBcastPool start;
1939
1940         if(context->bufferedBcastList == NULL){
1941                 return;
1942         }
1943         start = context->bufferedBcastList;
1944         if(context->insideProcessBufferedBcasts==1){
1945                 return;
1946         }
1947         context->insideProcessBufferedBcasts=1;
1948
1949         while(start->next != NULL){
1950                 start = start->next;
1951         }
1952         
1953         while(start != NULL){
1954                 int i=0;
1955                 infiBufferedBcastPool tmp;
1956                 if(start->count != 0){
1957                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1958                 }
1959                 for(i=0;i<start->count;i++){
1960                         if(start->bcastList[i].valid == 0){
1961                                 continue;
1962                         }
1963                         start->bcastList[i].valid=0;
1964                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1965 #if CMK_BROADCAST_SPANNING_TREE
1966         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1967 #if CMK_NODE_QUEUE_AVAILABLE
1968           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1969 #endif
1970          ){
1971                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
1972                                         }
1973 #elif CMK_BROADCAST_HYPERCUBE
1974         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1975 #if CMK_NODE_QUEUE_AVAILABLE
1976           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1977 #endif
1978          ){
1979                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
1980                                         }
1981 #endif
1982                 }
1983                 if(start->count != 0){
1984                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1985                 }
1986                 
1987                 tmp = start;
1988                 start = start->prev;
1989                 free(tmp);
1990                 if(start != NULL){
1991                         //not the first one
1992                         start->next = NULL;
1993                 }
1994         }
1995
1996         context->bufferedBcastList = NULL;
1997 /*      context->bufferedBcastList->prev = NULL;
1998         context->bufferedBcastList->count =0;   */
1999         context->insideProcessBufferedBcasts=0;
2000         MACHSTATE(2,"processBufferedBcast done ");
2001 };
2002
2003
2004 static inline void processBufferedRdmaAcks(){
2005         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2006         if(start == NULL){
2007                 return;
2008         }
2009         while(start->next != NULL){
2010                 start = start->next;
2011         }
2012         while(start != NULL){
2013                 struct infiRdmaPacket *rdmaPacket=start;
2014                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2015                 EnqueueRdmaAck(rdmaPacket);
2016                 start = start->prev;
2017                 free(rdmaPacket);
2018         }
2019         context->bufferedRdmaAcks=NULL;
2020 }
2021
2022
2023
2024 static inline void processBufferedRdmaRequests(){
2025         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2026         if(start == NULL){
2027                 return;
2028         }
2029         
2030         
2031         while(start->next != NULL){
2032                 start = start->next;
2033         }
2034         while(start != NULL){
2035                 struct infiRdmaPacket *rdmaPacket=start;
2036                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2037                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2038                 start = start->prev;
2039         }
2040         
2041         context->bufferedRdmaRequests=NULL;
2042 }
2043
2044
2045
2046
2047
2048 static inline void processAllBufferedMsgs(){
2049 #if CMK_IBVERBS_STATS
2050         double _startTime = CmiWallTimer();
2051         processBufferedCount++;
2052 #endif
2053         processBufferedBcast();
2054
2055         processBufferedRdmaAcks();
2056         processBufferedRdmaRequests();
2057 #if CMK_IBVERBS_STATS
2058         processBufferedTime += (CmiWallTimer()-_startTime);
2059 #endif  
2060 };
2061
2062
2063 /*************************
2064         Increase tokens when short of them
2065 **********/
2066 static inline void increaseTokens(OtherNode node){
2067         int err;
2068         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2069         if(node->infiData->totalTokens + increase > maxTokens){
2070                 increase = maxTokens-node->infiData->totalTokens;
2071         }
2072         node->infiData->totalTokens += increase;
2073         node->infiData->tokensLeft += increase;
2074         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2075         //increase the size of the sendCq
2076         int currentCqSize = context->sendCqSize;
2077         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2078                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2079                 CmiAssert(0);
2080         }
2081         context->sendCqSize+= increase;
2082 };
2083
2084 static void increasePostedRecvs(int nodeNo){
2085         OtherNode node = &nodes[nodeNo];
2086         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2087         int recvIncrease = tokenIncrease;
2088         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2089                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2090         }
2091         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2092                 recvIncrease = maxRecvBuffers-context->srqSize;
2093         }
2094         node->infiData->postedRecvs+= recvIncrease;
2095         context->srqSize += recvIncrease;
2096         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2097         //increase the size of the recvCq
2098         int currentCqSize = context->recvCqSize;
2099         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2100                 CmiAssert(0);
2101         }
2102         context->recvCqSize += tokenIncrease;
2103         if(recvIncrease > 0){
2104                 //create another bufferPool and attach it to the top of the current one
2105                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2106                 newPool->next = context->recvBufferPool;
2107                 context->recvBufferPool = newPool;
2108                 postInitialRecvs(newPool,recvIncrease,packetSize);
2109         }
2110
2111 };
2112
2113
2114
2115
2116 /*********************************************
2117         Memory management routines for RDMA
2118
2119 ************************************************/
2120
2121 /**
2122         There are INFINUMPOOLS of memory.
2123         The first pool is of size firstBinSize.
2124         The ith pool is of size firstBinSize*2^i
2125 */
2126
2127 static void initInfiCmiChunkPools(){
2128         int i,j;
2129         int size = firstBinSize;
2130         int nodeSize;
2131
2132 #if THREAD_MULTI_POOL
2133         nodeSize = CmiMyNodeSize() + 1;
2134         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2135         for(i = 0; i < nodeSize; i++){
2136                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2137         }
2138         for(j = 0; j < nodeSize; j++){
2139                 size = firstBinSize;
2140                 for(i=0;i<INFINUMPOOLS;i++){
2141                         infiCmiChunkPools[j][i].size = size;
2142                         infiCmiChunkPools[j][i].startBuf = NULL;
2143                         infiCmiChunkPools[j][i].count = 0;
2144                         size *= 2;
2145                 }
2146         }
2147
2148         // creating the n^2 system of queues
2149         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2150         for(i = 0; i < nodeSize; i++){
2151                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2152         }
2153         for(i = 0; i < nodeSize; i++)
2154                 for(j = 0; j < nodeSize; j++)
2155                         queuePool[i][j] = PCQueueCreate();
2156
2157 #else
2158
2159         size = firstBinSize;    
2160         for(i=0;i<INFINUMPOOLS;i++){
2161                 infiCmiChunkPools[i].size = size;
2162                 infiCmiChunkPools[i].startBuf = NULL;
2163                 infiCmiChunkPools[i].count = 0;
2164                 size *= 2;
2165         }
2166 #endif
2167
2168 }
2169
2170 /***********
2171 Register memory for a part of a received multisend message
2172 *************/
2173 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2174         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2175         char *res=msg-sizeof(infiCmiChunkHeader);
2176         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2177         metaData->owner = NULL;
2178         metaData->poolIdx = INFIMULTIPOOL;
2179
2180         return metaData;
2181 };
2182
2183
2184 #if THREAD_MULTI_POOL
2185
2186 // Fills up the buffer pools for every thread in the node
2187 static inline void fillBufferPools(){
2188         int nodeSize, poolIdx, thread;
2189         infiCmiChunkMetaData *metaData;         
2190         infiCmiChunkHeader *hdr;
2191         int allocSize;
2192         int count=1;
2193         int i;
2194         struct ibv_mr *key;
2195         void *res;
2196
2197         // initializing values
2198         nodeSize = CmiMyNodeSize() + 1;
2199
2200         // iterating over all threads and all pools
2201         for(thread = 0; thread < nodeSize; thread++){
2202                 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2203                         allocSize = infiCmiChunkPools[thread][poolIdx].size;
2204                         if(poolIdx < blockThreshold){
2205                                 count = blockAllocRatio;
2206                         }else{
2207                                 count = 1;
2208                         }
2209                         res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2210                         hdr = res;
2211                         key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2212                         CmiAssert(key != NULL);
2213                         res += sizeof(infiCmiChunkHeader);
2214                         for(i=0;i<count;i++){
2215                                 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2216                                 metaData->key = key;
2217                                 metaData->owner = hdr;
2218                                 metaData->poolIdx = poolIdx;
2219                                 metaData->parentPe = thread;                                            // setting the parent PE
2220                                 if(i == 0){
2221                                         metaData->owner->metaData->count = count;
2222                                         metaData->nextBuf = NULL;
2223                                 }else{
2224                                         void *startBuf = res - sizeof(infiCmiChunkHeader);
2225                                         metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2226                                         infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2227                                         infiCmiChunkPools[thread][poolIdx].count++;
2228                                 }
2229                                 if(i != count-1){
2230                                         res += (allocSize+sizeof(infiCmiChunkHeader));
2231                                 }
2232                         }
2233                 }
2234         }       
2235 }
2236
2237 static inline void *getInfiCmiChunkThread(int dataSize){
2238         //find out to which pool this dataSize belongs to
2239         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2240         int ratio = dataSize/firstBinSize;
2241         int poolIdx=0;
2242         void *res;
2243         int i,j,nodeSize;
2244         void *pointer;
2245
2246         //printf("Hi\n");
2247         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2248         MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2249         
2250         while(ratio > 0){
2251                 ratio  = ratio >> 1;
2252                 poolIdx++;
2253         }
2254         MACHSTATE1(2,"This is %d",CmiMyRank());
2255         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2256
2257         // checking whether to analyze the free queues to reuse buffers 
2258         nodeSize = CmiMyNodeSize() + 1;
2259         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2260                 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2261                 for(i = 0; i < nodeSize; i++){
2262                         if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2263                                 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2264                                         pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2265                                         infi_CmiFreeDirect(pointer);    
2266                                 }
2267                         }
2268                 }       
2269         }
2270
2271         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2272                 infiCmiChunkMetaData *metaData;         
2273                 infiCmiChunkHeader *hdr;
2274                 int allocSize;
2275                 int count=1;
2276                 int i;
2277                 struct ibv_mr *key;
2278                 void *origres;
2279                 
2280                 
2281                 if(poolIdx < INFINUMPOOLS ){
2282                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2283                 }else{
2284                         allocSize = dataSize;
2285                 }
2286
2287                 if(poolIdx < blockThreshold){
2288                         count = blockAllocRatio;
2289                 }
2290                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2291                 hdr = res;
2292                 
2293                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2294                 CmiAssert(key != NULL);
2295                 
2296                 origres = (res += sizeof(infiCmiChunkHeader));
2297
2298                 for(i=0;i<count;i++){
2299                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2300                         metaData->key = key;
2301                         metaData->owner = hdr;
2302                         metaData->poolIdx = poolIdx;
2303                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2304
2305                         if(i == 0){
2306                                 metaData->owner->metaData->count = count;
2307                                 metaData->nextBuf = NULL;
2308                         }else{
2309                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2310                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2311                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2312                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2313                                 
2314                         }
2315                         if(i != count-1){
2316                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2317                         }
2318           }     
2319                 
2320                 
2321                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2322                 
2323                 return origres;
2324         }
2325         if(poolIdx < INFINUMPOOLS){
2326                 infiCmiChunkMetaData *metaData;                         
2327         
2328                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2329                 res += sizeof(infiCmiChunkHeader);
2330
2331                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2332                 metaData = METADATAFIELD(res);
2333
2334                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2335                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2336                 
2337                 metaData->nextBuf = NULL;
2338 //              CmiAssert(metaData->poolIdx == poolIdx);
2339
2340                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2341                 return res;
2342         }
2343
2344         CmiAssert(0);
2345
2346         
2347 };
2348 #else
2349 static inline void *getInfiCmiChunk(int dataSize){
2350         //find out to which pool this dataSize belongs to
2351         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2352         int ratio = dataSize/firstBinSize;
2353         int poolIdx=0;
2354         void *res;
2355
2356         while(ratio > 0){
2357                 ratio  = ratio >> 1;
2358                 poolIdx++;
2359         }
2360         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2361         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2362                 infiCmiChunkMetaData *metaData;
2363                 infiCmiChunkHeader *hdr;
2364                 int allocSize;
2365                 int count=1;
2366                 int i;
2367                 struct ibv_mr *key;
2368                 void *origres;
2369
2370
2371                 if(poolIdx < INFINUMPOOLS ){
2372                         allocSize = infiCmiChunkPools[poolIdx].size;
2373                 }else{
2374                         allocSize = dataSize;
2375                 }
2376
2377                 if(poolIdx < blockThreshold){
2378                         count = blockAllocRatio;
2379                 }
2380                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2381                 hdr = res;
2382
2383                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2384                 CmiAssert(key != NULL);
2385
2386                 origres = (res += sizeof(infiCmiChunkHeader));
2387
2388                 for(i=0;i<count;i++){
2389                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2390                         metaData->key = key;
2391                         metaData->owner = hdr;
2392                         metaData->poolIdx = poolIdx;
2393
2394                         if(i == 0){
2395                                 metaData->owner->metaData->count = count;
2396                                 metaData->nextBuf = NULL;
2397                         }else{
2398                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2399                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2400                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2401                                 infiCmiChunkPools[poolIdx].count++;
2402
2403                         }
2404                         if(i != count-1){
2405                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2406                         }
2407           }
2408
2409
2410                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2411
2412                 return origres;
2413         }
2414         if(poolIdx < INFINUMPOOLS){
2415                 infiCmiChunkMetaData *metaData;
2416
2417                 res = infiCmiChunkPools[poolIdx].startBuf;
2418                 res += sizeof(infiCmiChunkHeader);
2419
2420                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2421                 metaData = METADATAFIELD(res);
2422
2423                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2424                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2425
2426                 metaData->nextBuf = NULL;
2427 //              CmiAssert(metaData->poolIdx == poolIdx);
2428
2429                 infiCmiChunkPools[poolIdx].count--;
2430                 return res;
2431         }
2432
2433         CmiAssert(0);
2434
2435
2436 };
2437 #endif
2438
2439
2440 void * infi_CmiAlloc(int size){
2441         void *res;
2442
2443         //TODO: erase this
2444         _startTime = CmiWallTimer();    
2445
2446 #if THREAD_MULTI_POOL
2447         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2448         res -= sizeof(CmiChunkHeader);
2449
2450         //TODO: erase this      
2451         processBufferedTime += CmiWallTimer() - _startTime;
2452         processBufferedCount++;
2453
2454         return res;
2455 #else
2456 #if CMK_SMP
2457         _auxTime = CmiWallTimer();
2458         CmiMemLock();
2459         processLockTime += CmiWallTimer() - _auxTime;
2460 #endif
2461 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2462                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2463
2464                 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));     
2465                 res -= sizeof(CmiChunkHeader);
2466 #if CMK_SMP     
2467         _auxTime = CmiWallTimer();
2468         CmiMemUnlock();
2469         processLockTime += CmiWallTimer() - _auxTime;
2470 #endif
2471 /*      }else{
2472                 res = malloc(size);
2473         }*/
2474         
2475         //TODO: erase this      
2476         processBufferedTime += CmiWallTimer() - _startTime;
2477         processBufferedCount++;
2478
2479         return res;
2480 #endif
2481 }
2482
2483 #if THREAD_MULTI_POOL
2484 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2485 void infi_CmiFreeDirect(void *ptr){
2486         int size;
2487         int parentPe;
2488         void *freePtr = ptr;
2489
2490         //ptr += sizeof(CmiChunkHeader);
2491         size = SIZEFIELD (ptr);
2492 /*      if(size > firstBinSize){*/
2493         infiCmiChunkMetaData *metaData;
2494         int poolIdx;
2495         //there is a infiniband specific header
2496         freePtr = ptr - sizeof(infiCmiChunkHeader);
2497         metaData = METADATAFIELD(ptr);
2498         poolIdx = metaData->poolIdx;
2499
2500         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2501 //      CmiAssert(poolIdx >= 0);
2502         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].count <= INFIMAXPERPOOL){
2503                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2504                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = freePtr;
2505                         infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2506
2507                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf,infiCmiChunkPools[CmiMyRank()][poolIdx].count);
2508                 }else{
2509                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2510                         metaData->owner->metaData->count--;
2511                         if(metaData->owner->metaData == metaData){
2512                                 //I am the owner
2513                                 if(metaData->owner->metaData->count == 0){
2514                                         //all the chunks have been freed
2515                                         ibv_dereg_mr(metaData->key);
2516                                         free(freePtr);
2517                                         free(metaData);
2518                                 }
2519                                 //if I am the owner and all the chunks have not been
2520                                 // freed dont free my metaData. will need later
2521                         }else{
2522                                 if(metaData->owner->metaData->count == 0){
2523                                         //need to free the owner's buffer and metadata
2524                                         freePtr = metaData->owner;
2525                                         ibv_dereg_mr(metaData->key);
2526                                         free(metaData->owner->metaData);
2527                                         free(freePtr);
2528                                 }
2529                                 free(metaData);
2530                         }
2531                 }
2532 }
2533
2534
2535 void infi_CmiFree(void *ptr){
2536         int i,j;
2537         int size;
2538         int parentPe;
2539         int nodeSize;
2540         void *pointer;
2541         void *freePtr = ptr;
2542         nodeSize = CmiMyNodeSize() + 1;
2543
2544         MACHSTATE(3,"Freeing");
2545
2546         ptr += sizeof(CmiChunkHeader);
2547         size = SIZEFIELD (ptr);
2548 /*      if(size > firstBinSize){*/
2549         infiCmiChunkMetaData *metaData;
2550         int poolIdx;
2551         //there is a infiniband specific header
2552         freePtr = ptr - sizeof(infiCmiChunkHeader);
2553         metaData = METADATAFIELD(ptr);
2554         poolIdx = metaData->poolIdx;
2555
2556         if(poolIdx == INFIMULTIPOOL){
2557                 /** this is a part of a received mult message  
2558                     it will be freed correctly later
2559                 **/
2560
2561                 return;
2562         }
2563
2564
2565         //TODO: erase this
2566         TESTfrees++;
2567
2568         // checking if this free operation is my responsibility
2569         parentPe = metaData->parentPe;
2570         if(parentPe != CmiMyRank()){
2571                 TESTneighbor++;
2572                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2573                 return;
2574         }
2575
2576
2577         infi_CmiFreeDirect(ptr);
2578
2579 // TODO: erase following code
2580 /*      // checking free request queues
2581         for(i = 0; i < nodeSize; i++){
2582                 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2583                         for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2584                                 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2585                                 infi_CmiFreeDirect(pointer);    
2586                         }
2587                 }
2588         }
2589 */
2590
2591 }
2592
2593 #else
2594 void infi_CmiFree(void *ptr){
2595         int size;
2596         void *freePtr = ptr;
2597         
2598 #if CMK_SMP     
2599         CmiMemLock();
2600 #endif
2601         ptr += sizeof(CmiChunkHeader);
2602         size = SIZEFIELD (ptr);
2603 /*      if(size > firstBinSize){*/
2604                 infiCmiChunkMetaData *metaData;
2605                 int poolIdx;
2606                 //there is a infiniband specific header
2607                 freePtr = ptr - sizeof(infiCmiChunkHeader);
2608                 metaData = METADATAFIELD(ptr);
2609                 poolIdx = metaData->poolIdx;
2610                 if(poolIdx == INFIMULTIPOOL){
2611                         /** this is a part of a received mult message  
2612                         it will be freed correctly later
2613                         **/
2614                         
2615                         return;
2616                 }
2617                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2618 //              CmiAssert(poolIdx >= 0);
2619                 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
2620                         metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2621                         infiCmiChunkPools[poolIdx].startBuf = freePtr;
2622                         infiCmiChunkPools[poolIdx].count++;
2623                         
2624                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2625                 }else{
2626                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2627                         metaData->owner->metaData->count--;
2628                         if(metaData->owner->metaData == metaData){
2629                                 //I am the owner
2630                                 if(metaData->owner->metaData->count == 0){
2631                                         //all the chunks have been freed
2632                                         ibv_dereg_mr(metaData->key);
2633                                         free(freePtr);
2634                                         free(metaData);
2635                                 }
2636                                 //if I am the owner and all the chunks have not been
2637                                 // freed dont free my metaData. will need later
2638                         }else{
2639                                 if(metaData->owner->metaData->count == 0){
2640                                         //need to free the owner's buffer and metadata
2641                                         freePtr = metaData->owner;
2642                                         ibv_dereg_mr(metaData->key);
2643                                         free(metaData->owner->metaData);
2644                                         free(freePtr);
2645                                 }
2646                                 free(metaData);
2647                         }
2648                 }       
2649 #if CMK_SMP     
2650         CmiMemUnlock();
2651 #endif
2652 /*      }else{
2653                 free(freePtr);
2654         }*/
2655 }
2656 #endif
2657
2658 /*********************************************************************************************
2659 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2660 the user can transfer data between processors without using Charm++ messages. This lets the user
2661 send and receive data from the middle of his arrays without any copying on either send or receive
2662 side
2663 *********************************************************************************************/
2664
2665 struct infiDirectRequestPacket{
2666         int senderProc;
2667         int handle;
2668         struct ibv_mr senderKey;
2669         void *senderBuf;
2670         int senderBufSize;
2671 };
2672
2673 #include "cmidirect.h"
2674
2675 #define MAXHANDLES 512
2676
2677 struct infiDirectHandleStruct;
2678
2679
2680 typedef struct directPollingQNodeStruct {
2681         struct infiDirectHandleStruct *handle;
2682         struct directPollingQNodeStruct *next;
2683         double *lastDouble;
2684 } directPollingQNode;
2685
2686 typedef struct infiDirectHandleStruct{
2687         int id;
2688         void *buf;
2689         int size;
2690         struct ibv_mr *key;
2691         void (*callbackFnPtr)(void *);
2692         void *callbackData;
2693 //      struct infiDirectRequestPacket *packet;
2694         struct infiDirectUserHandle userHandle;
2695         struct infiRdmaPacket *rdmaPacket;
2696         directPollingQNode pollingQNode;
2697 }       infiDirectHandle;
2698
2699 typedef struct infiDirectHandleTableStruct{
2700         infiDirectHandle handles[MAXHANDLES];
2701         struct infiDirectHandleTableStruct *next;
2702 } infiDirectHandleTable;
2703
2704
2705 // data structures 
2706
2707 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2708
2709 static infiDirectHandleTable **sendHandleTable=NULL;
2710 static infiDirectHandleTable **recvHandleTable=NULL;
2711
2712 static int *recvHandleCount=NULL;
2713
2714 void addHandleToPollingQ(infiDirectHandle *handle){
2715 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2716         directPollingQNode *newNode = &(handle->pollingQNode);
2717         newNode->handle = handle;
2718         newNode->next = NULL;
2719         if(headDirectPollingQ==NULL){
2720                 /*empty pollingQ*/
2721                 headDirectPollingQ = newNode;
2722                 tailDirectPollingQ = newNode;
2723         }else{
2724                 tailDirectPollingQ->next = newNode;
2725                 tailDirectPollingQ = newNode;
2726         }
2727 };
2728 /*
2729 infiDirectHandle *removeHandleFromPollingQ(){
2730         if(headDirectPollingQ == NULL){
2731                 //polling Q is empty
2732                 return NULL;
2733         }
2734         directPollingQNode *retNode = headDirectPollingQ;
2735         if(headDirectPollingQ == tailDirectPollingQ){
2736                 //PollingQ has one node 
2737                 headDirectPollingQ = tailDirectPollingQ = NULL;
2738         }else{
2739                 headDirectPollingQ = headDirectPollingQ->next;
2740         }
2741         infiDirectHandle *retHandle = retNode->handle;
2742         free(retNode);
2743         return retHandle;
2744 }*/
2745
2746 static inline infiDirectHandleTable **createHandleTable(){
2747         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2748         int i;
2749         for(i=0;i<_Cmi_numnodes;i++){
2750                 table[i] = NULL;
2751         }
2752         return table;
2753 }
2754
2755 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2756         *tableIdx = handle/MAXHANDLES;
2757         *idx = handle%MAXHANDLES;
2758 };
2759
2760 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2761 {
2762         /** initialize the last double in the buffer to bufize***/
2763         int index = recvBufSize - sizeof(double);
2764         double *lastDouble = (double *)(((char *)recvBuf)+index);
2765         *lastDouble = initialValue;
2766 }
2767
2768
2769 /**
2770  To be called on the receiver to create a handle and return its number
2771 **/
2772 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2773         int newHandle;
2774         int tableIdx,idx;
2775         int i;
2776         infiDirectHandleTable *table;
2777         struct infiDirectUserHandle userHandle;
2778         
2779         CmiAssert(recvBufSize > sizeof(double));
2780         
2781         if(recvHandleTable == NULL){
2782                 recvHandleTable = createHandleTable();
2783                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2784                 for(i=0;i<_Cmi_numnodes;i++){
2785                         recvHandleCount[i] = -1;
2786                 }
2787         }
2788         if(recvHandleTable[senderNode] == NULL){
2789                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2790                 recvHandleTable[senderNode]->next = NULL;               
2791         }
2792         
2793         newHandle = ++recvHandleCount[senderNode];
2794         CmiAssert(newHandle >= 0);
2795         
2796         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2797         
2798         table = recvHandleTable[senderNode];
2799         for(i=0;i<tableIdx;i++){
2800                 if(table->next ==NULL){
2801                         table->next = malloc(sizeof(infiDirectHandleTable));
2802                         table->next->next = NULL;
2803                 }
2804                 table = table->next;
2805         }
2806         table->handles[idx].id = newHandle;
2807         table->handles[idx].buf = recvBuf;
2808         table->handles[idx].size = recvBufSize;
2809 #if CMI_DIRECT_DEBUG
2810         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2811 #endif
2812         table->handles[idx].callbackFnPtr = callbackFnPtr;
2813         table->handles[idx].callbackData = callbackData;
2814         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2815         CmiAssert(table->handles[idx].key != NULL);
2816
2817 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2818         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2819         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2820         
2821         userHandle.handle = newHandle;
2822         userHandle.recverNode = _Cmi_mynode;
2823         userHandle.senderNode = senderNode;
2824         userHandle.recverBuf = recvBuf;
2825         userHandle.recverBufSize = recvBufSize;
2826         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2827         userHandle.initialValue = initialValue;
2828         
2829         table->handles[idx].userHandle = userHandle;
2830         
2831         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2832
2833   {
2834          int index = table->handles[idx].size - sizeof(double);
2835    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2836         } 
2837         
2838         addHandleToPollingQ(&(table->handles[idx]));
2839         
2840 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2841         
2842         return userHandle;
2843 }
2844
2845 /****
2846  To be called on the sender to attach the sender's buffer to this handle
2847 ******/
2848 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2849         int tableIdx,idx;
2850         int i;
2851         int handle = userHandle->handle;
2852         int recverNode  = userHandle->recverNode;
2853
2854         infiDirectHandleTable *table;
2855
2856         if(sendHandleTable == NULL){
2857                 sendHandleTable = createHandleTable();
2858         }
2859         if(sendHandleTable[recverNode] == NULL){
2860                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2861                 sendHandleTable[recverNode]->next = NULL;
2862         }
2863         
2864         CmiAssert(handle >= 0);
2865         calcHandleTableIdx(handle,&tableIdx,&idx);
2866         
2867         table = sendHandleTable[recverNode];
2868         for(i=0;i<tableIdx;i++){
2869                 if(table->next ==NULL){
2870                         table->next = malloc(sizeof(infiDirectHandleTable));
2871                         table->next->next = NULL;
2872                 }
2873                 table = table->next;
2874         }
2875
2876         table->handles[idx].id = handle;
2877         table->handles[idx].buf = sendBuf;
2878
2879         table->handles[idx].size = sendBufSize;
2880 #if CMI_DIRECT_DEBUG
2881         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2882 #endif
2883         table->handles[idx].callbackFnPtr = table->handles[idx].callbackData = NULL;
2884         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2885         CmiAssert(table->handles[idx].key != NULL);
2886         table->handles[idx].userHandle = *userHandle;
2887         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2888         
2889         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2890         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2891         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2892         
2893         
2894 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2895         table->handles[idx].packet->senderProc = _Cmi_mynode;
2896         table->handles[idx].packet->handle = handle;
2897         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2898         table->handles[idx].packet->senderBuf = sendBuf;
2899         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2900         
2901         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2902 };
2903
2904
2905
2906
2907
2908 /****
2909 To be called on the sender to do the actual data transfer
2910 ******/
2911 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2912         int handle = userHandle->handle;
2913         int recverNode = userHandle->recverNode;
2914         if(recverNode == _Cmi_mynode){
2915                 /*when the sender and receiver are on the same
2916                 processor, just look up the sender and receiver
2917                 buffers and do a memcpy*/
2918
2919                 infiDirectHandleTable *senderTable;
2920                 infiDirectHandleTable *recverTable;
2921                 
2922                 int tableIdx,idx,i;
2923
2924                 
2925                 /*find entry for this handle in sender table*/
2926                 calcHandleTableIdx(handle,&tableIdx,&idx);
2927                 CmiAssert(sendHandleTable!= NULL);
2928                 senderTable = sendHandleTable[_Cmi_mynode];
2929                 CmiAssert(senderTable != NULL);
2930                 for(i=0;i<tableIdx;i++){
2931                         senderTable = senderTable->next;
2932                 }
2933
2934                 /**find entry for this handle in recver table*/
2935                 recverTable = recvHandleTable[recverNode];
2936                 CmiAssert(recverTable != NULL);
2937                 for(i=0;i< tableIdx;i++){
2938                         recverTable = recverTable->next;
2939                 }
2940                 
2941                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
2942                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
2943 #if CMI_DIRECT_DEBUG
2944                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
2945 #endif
2946                 // The polling Q should find you and handle the callback and pollingq entry
2947                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
2948                 
2949
2950         }else{
2951                 infiPacket packet;
2952                 int tableIdx,idx;
2953                 int i;
2954                 OtherNode node;
2955                 infiDirectHandleTable *table;
2956
2957                 calcHandleTableIdx(handle,&tableIdx,&idx);
2958
2959                 table = sendHandleTable[recverNode];
2960                 CmiAssert(table != NULL);
2961                 for(i=0;i<tableIdx;i++){
2962                         table = table->next;
2963                 }
2964
2965 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
2966 #if CMI_DIRECT_DEBUG
2967                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
2968 #endif
2969
2970                 
2971                 {
2972                         
2973                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
2974                         struct ibv_sge list = {
2975                                 .addr = (uintptr_t )table->handles[idx].buf,
2976                                 .length = table->handles[idx].size,
2977                                 .lkey   = table->handles[idx].key->lkey
2978                         };
2979                         
2980                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
2981
2982                         struct ibv_send_wr *bad_wr;
2983                         struct ibv_send_wr wr = {
2984                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
2985                                 .sg_list = &list,
2986                                 .num_sge = 1,
2987                                 .opcode = IBV_WR_RDMA_WRITE,
2988                                 .send_flags = IBV_SEND_SIGNALED,
2989                                 
2990                                 .wr.rdma = {
2991                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
2992                                         .rkey = remoteKey->rkey
2993                                 }
2994                         };
2995                         /** post and rdma_read that is a rdma get*/
2996                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
2997                                 CmiAssert(0);
2998                         }
2999                 }
3000
3001         /*      MallocInfiPacket (packet);
3002                 {
3003                         packet->size = sizeof(struct infiDirectRequestPacket);
3004                         packet->buf = (char *)(table->handles[idx].packet);
3005                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3006                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3007                 }*/
3008         }
3009
3010 };
3011
3012 /**** need not be called the first time *********/
3013 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3014   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3015 }
3016
3017 /**** need not be called the first time *********/
3018 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3019         int handle = userHandle->handle;
3020         int tableIdx,idx,i;
3021         infiDirectHandleTable *table;
3022         calcHandleTableIdx(handle,&tableIdx,&idx);
3023         
3024         table = recvHandleTable[userHandle->senderNode];
3025         CmiAssert(table != NULL);
3026         for(i=0;i<tableIdx;i++){
3027                 table = table->next;
3028         }
3029 #if CMI_DIRECT_DEBUG
3030   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3031 #endif  
3032         addHandleToPollingQ(&(table->handles[idx]));
3033         
3034
3035 }
3036
3037 /**** need not be called the first time *********/
3038 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3039         int handle = userHandle->handle;
3040         int tableIdx,idx,i;
3041         infiDirectHandleTable *table;
3042         
3043         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3044
3045         calcHandleTableIdx(handle,&tableIdx,&idx);
3046         
3047         table = recvHandleTable[userHandle->senderNode];
3048         CmiAssert(table != NULL);
3049         for(i=0;i<tableIdx;i++){
3050                 table = table->next;
3051         }
3052 #if CMI_DIRECT_DEBUG
3053   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3054 #endif  
3055         addHandleToPollingQ(&(table->handles[idx]));
3056         
3057 }
3058
3059
3060 static int receivedDirectMessage(infiDirectHandle *handle){
3061 //      int index = handle->size - sizeof(double);
3062 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
3063         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3064                 return 0;
3065         }else{
3066                 (*(handle->callbackFnPtr))(handle->callbackData);       
3067                 return 1;
3068         }
3069         
3070 }
3071
3072
3073 static void pollCmiDirectQ(){
3074         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3075         while(ptr != NULL){
3076                 if(receivedDirectMessage(ptr->handle)){
3077 #if CMI_DIRECT_DEBUG
3078       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
3079 #endif
3080                         directPollingQNode *delPtr = ptr;
3081                         /** has been received and delete this node***/
3082                         if(prevPtr == NULL){
3083                                 /** first in the pollingQ**/
3084                                 if(headDirectPollingQ == tailDirectPollingQ){
3085                                         /**only node in pollingQ****/
3086                                         headDirectPollingQ = tailDirectPollingQ = NULL;
3087                                 }else{
3088                                         headDirectPollingQ = headDirectPollingQ->next;
3089                                 }
3090                         }else{
3091                                 if(ptr == tailDirectPollingQ){
3092                                         /**last node is being deleted**/
3093                                         tailDirectPollingQ = prevPtr;
3094                                 }
3095                                 prevPtr->next = ptr->next;
3096                         }
3097                         ptr = ptr->next;
3098                 //      free(delPtr);
3099                 }else{
3100                         prevPtr = ptr;
3101                         ptr = ptr->next;
3102                 }
3103         }
3104 }
3105
3106
3107 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3108         int senderProc = directRequestPacket->senderProc;
3109         int handle = directRequestPacket->handle;
3110         int tableIdx,idx,i;
3111         infiDirectHandleTable *table;
3112         OtherNode node = nodes_by_pe[senderProc];
3113
3114         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3115
3116         calcHandleTableIdx(handle,&tableIdx,&idx);
3117
3118         table = recvHandleTable[senderProc];
3119         CmiAssert(table != NULL);
3120         for(i=0;i<tableIdx;i++){
3121                 table = table->next;
3122         }
3123         
3124         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3125         
3126         {
3127                 struct ibv_sge list = {
3128                         .addr = (uintptr_t )table->handles[idx].buf,
3129                         .length = table->handles[idx].size,
3130                         .lkey   = table->handles[idx].key->lkey
3131                 };
3132
3133                 struct ibv_send_wr *bad_wr;
3134                 struct ibv_send_wr wr = {
3135                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3136                         .sg_list = &list,
3137                         .num_sge = 1,
3138                         .opcode = IBV_WR_RDMA_READ,
3139                         .send_flags = IBV_SEND_SIGNALED,
3140                         .wr.rdma = {
3141                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3142                                 .rkey = directRequestPacket->senderKey.rkey
3143                         }
3144                 };
3145 //       post and rdma_read that is a rdma get
3146                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3147                         CmiAssert(0);
3148                 }
3149         }
3150                         
3151         
3152 };*/
3153 /*
3154 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3155         MACHSTATE(3,"processDirectWC");
3156         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3157         (*(handle->callbackFnPtr))(handle->callbackData);
3158 };
3159 */
3160
3161 static void sendBarrierMessage(int pe)
3162 {
3163   /* we will only need one packet */
3164   int size=32;
3165   OtherNode  node = nodes + pe;
3166   infiPacket packet;
3167   MallocInfiPacket(packet);
3168   packet->size = size;
3169   packet->buf = CmiAlloc(size);
3170   packet->header.code = INFIBARRIERPACKET;
3171   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3172   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3173   /*  pollSendCq(0);*/
3174   EnqueuePacket(node,packet,size,key);
3175 }
3176
3177 static void recvBarrierMessage()
3178 {
3179   int i;
3180   int ne;
3181   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3182   struct ibv_wc wc[1];
3183   struct ibv_wc *recvWC;
3184   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3185   int toBuffer=1; // buffer without processing recvd messages
3186   int barrierReached=0;
3187   struct infiBuffer *buffer = NULL;
3188   struct infiPacketHeader *header = NULL;
3189   int nodeNo=-1;
3190   int len=-1;
3191   while(!barrierReached)
3192     {
3193       /* gengbin's semantic will implode if more than one q is polled at a time */
3194       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3195       //        CmiAssert(ne >=0);
3196       if(ne != 0){
3197         MACHSTATE1(3,"recvBarrier ne %d",ne);
3198       }
3199       pollSendCq(1); 
3200       for(i=0;i<ne;i++){
3201         if(wc[i].status != IBV_WC_SUCCESS){
3202           CmiAssert(0);
3203         }
3204         switch(wc[i].opcode){
3205         case IBV_WC_RECV: /* we have something to consider*/
3206           recvWC=&wc[i];
3207           buffer = (struct infiBuffer *) recvWC->wr_id; 
3208           header = (struct infiPacketHeader *)buffer->buf;
3209           nodeNo = header->nodeNo;
3210           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3211
3212           if(header->code & INFIPACKETCODE_DATA){
3213             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3214           }
3215           if(header->code & INFIDUMMYPACKET){
3216             MACHSTATE(3,"Dummy packet");
3217           }
3218           if(header->code & INFIBARRIERPACKET){
3219             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3220             // now we are done
3221             barrierReached=1;
3222             /* semantically questionable */
3223             //processAllBufferedMsgs();
3224             //return;
3225           }
3226           if(rdma && header->code & INFIRDMA_START){
3227             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3228             //          if(toBuffer){
3229             //TODO: make a function of this and use for both acks and requests
3230             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3231             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3232             *copyPacket = *rdmaPacket;
3233             copyPacket->fromNodeNo = nodeNo;
3234             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3235             context->bufferedRdmaRequests = copyPacket;
3236             copyPacket->next = tmp;
3237             copyPacket->prev = NULL;
3238             if(tmp != NULL){
3239               tmp->prev = copyPacket;
3240             }
3241             /*          }else{
3242                         processRdmaRequest(rdmaPacket,nodeNo,0);
3243                         }*/
3244           }
3245           if(rdma && header->code & INFIRDMA_ACK){
3246             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3247             processRdmaAck(rdmaPacket);
3248           }
3249           {
3250             struct ibv_sge list = {
3251               .addr     = (uintptr_t) buffer->buf,
3252               .length = buffer->size,
3253               .lkey     = buffer->key->lkey
3254             };
3255         
3256             struct ibv_recv_wr wr = {
3257               .wr_id = (uint64_t)buffer,
3258               .sg_list = &list,
3259               .num_sge = 1,
3260               .next = NULL
3261             };
3262             struct ibv_recv_wr *bad_wr;
3263         
3264             if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3265               CmiAssert(0);
3266             }
3267           }
3268
3269           break;
3270         default:
3271           CmiAbort("Wrong type of work completion object in recvq");
3272           break;
3273         }
3274       }
3275     }
3276   /* semantically questionable */
3277   //  processAllBufferedMsgs();
3278 }
3279
3280
3281 /* happen at node level */
3282 int CmiBarrier()
3283 {
3284   int len, size, i;
3285   int status;
3286   int count = 0;
3287   OtherNode node;
3288   int numnodes = CmiNumNodes();
3289   if (CmiMyRank() == 0) {
3290     /* every one send to pe 0 */
3291     if (CmiMyNode() != 0) {
3292       sendBarrierMessage(0);
3293     }
3294     /* printf("[%d] HERE\n", CmiMyPe()); */
3295     if (CmiMyNode() == 0) 
3296     {
3297       for (count = 1; count < numnodes; count ++) 
3298       {
3299         recvBarrierMessage();
3300       }
3301       /* pe 0 broadcast */
3302       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3303         int p = i;
3304         if (p > numnodes - 1) break;
3305         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3306         sendBarrierMessage(p);
3307       }
3308     }
3309     /* non 0 node waiting */
3310     if (CmiMyNode() != 0) 
3311     {
3312       recvBarrierMessage();
3313       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3314         int p = CmiMyNode();
3315         p = BROADCAST_SPANNING_FACTOR*p + i;
3316         if (p > numnodes - 1) break;
3317         p = p%numnodes;
3318         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3319         sendBarrierMessage(p);
3320       }
3321     }
3322   }
3323   CmiNodeAllBarrier();
3324   processAllBufferedMsgs();
3325   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3326 }
3327
3328 /* everyone sends a message to pe 0 and go on */
3329 int CmiBarrierZero()
3330 {
3331   int i;
3332
3333   if (CmiMyRank() == 0) {
3334     if (CmiMyNode()) {
3335       sendBarrierMessage(0);
3336     }
3337     else {
3338       for (i=0; i<CmiNumNodes()-1; i++)
3339       {
3340         recvBarrierMessage();
3341       }
3342     }
3343   }
3344   CmiNodeAllBarrier();
3345   processAllBufferedMsgs();
3346 }
3347