cc4b1d05243c48c5aae3b7ef7522245f4dc3aeca
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2  * Ibverbs (infiniband)  implementation of Converse NET version
3  * @ingroup NET
4  * contains only Ibverbs specific code for:
5  * - CmiMachineInit()
6  * - CmiCommunicationInit()
7  * - CmiNotifyStillIdle()
8  * - DeliverViaNetwork()
9  * - CommunicationServer()
10  * - CmiMachineExit()
11
12   created by 
13         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
14 */
15
16 /**
17  * @addtogroup NET
18  * @{
19  */
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #if CMK_HAS_MALLOC_H
27 #include <malloc.h>
28 #endif
29 #include <getopt.h>
30 #include <time.h>
31
32 #include <infiniband/verbs.h>
33
34 enum ibv_mtu mtu = IBV_MTU_2048;
35 static int page_size;
36 static int mtu_size;
37 static int packetSize;
38 static int dataSize;
39 static int rdma;
40 static int rdmaThreshold;
41 static int firstBinSize;
42 static int blockAllocRatio;
43 static int blockThreshold;
44
45
46 static int maxRecvBuffers;
47 static int maxTokens;
48 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
49 static int sendPacketPoolSize; /*total number of send buffers created*/
50
51 static double _startTime=0;
52 static int regCount;
53
54 static int pktCount;
55 static int msgCount;
56 static int minTokensLeft;
57
58
59 static double regTime;
60
61 static double processBufferedTime;
62 static int processBufferedCount;
63
64 #define CMK_IBVERBS_STATS 0
65 #define CMK_IBVERBS_TOKENS_FLOW 1
66 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
67 #define CMK_IBVERBS_DEBUG 0
68 #define CMI_DIRECT_DEBUG 0
69 #define WC_LIST_SIZE 32
70 /*#define WC_BUFFER_SIZE 100*/
71
72 #if CMK_IBVERBS_STATS
73 static int numReg=0;
74 static int numUnReg=0;
75 static int numCurReg=0;
76 static int numAlloc=0;
77 static int numFree=0;
78 static int numMultiSendUnreg=0;
79 static int numMultiSend=0;
80 static int numMultiSendFree=0;
81 #endif
82
83
84 #define INCTOKENS_FRACTION 0.04
85 #define INCTOKENS_INCREASE .50
86
87 // flag for using a pool for every thread in SMP mode
88 #if CMK_SMP
89 #define THREAD_MULTI_POOL 1
90 #endif
91
92 #if THREAD_MULTI_POOL 
93 #include "pcqueue.h"
94 PCQueue **queuePool;
95 void infi_CmiFreeDirect(void *ptr);
96 static inline void fillBufferPools();
97 #endif
98
99 #define INFIBARRIERPACKET 128
100
101 struct infiIncTokenAckPacket{
102         int a;
103 };
104
105 typedef struct {
106 char none;  
107 } CmiIdleState;
108
109
110 /********
111 ** The notify idle methods
112 ***/
113
114 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
115
116 static void CmiNotifyStillIdle(CmiIdleState *s);
117
118
119 static void CmiNotifyBeginIdle(CmiIdleState *s)
120 {
121   CmiNotifyStillIdle(s);
122 }
123
124 void CmiNotifyIdle(void) {
125   CmiNotifyStillIdle(NULL);
126 }
127
128 /***************
129 Data Structures 
130 ***********************/
131
132 /******
133         This is a header attached to the beginning of every infiniband packet
134 *******/
135 #define INFIPACKETCODE_DATA 1
136 #define INFIPACKETCODE_INCTOKENS 2
137 #define INFIRDMA_START 4
138 #define INFIRDMA_ACK 8
139 #define INFIDIRECT_REQUEST 16
140 #define INFIPACKETCODE_INCTOKENSACK 32
141 #define INFIDUMMYPACKET 64
142
143 struct infiPacketHeader{
144         char code;
145         int nodeNo;
146 #if     CMK_IBVERBS_DEBUG
147         int psn;
148 #endif  
149 };
150
151 /*
152         Types of rdma packets
153 */
154 #define INFI_MESG 1 
155 #define INFI_DIRECT 2
156
157 struct infiRdmaPacket{
158         int fromNodeNo;
159         int type;
160         struct ibv_mr key;
161         struct ibv_mr *keyPtr;
162         int remoteSize;
163         char *remoteBuf;
164         void *localBuffer;
165         OutgoingMsg ogm;
166         struct infiRdmaPacket *next,*prev;
167 };
168
169
170 /** Represents a buffer that is used to receive messages
171 */
172 #define BUFFER_RECV 1
173 #define BUFFER_RDMA 2
174 struct infiBuffer{
175         int type;
176         char *buf;
177         int size;
178         struct ibv_mr *key;
179 };
180
181
182
183 /** At the moment it is a simple pool with just a list of buffers
184         * TODO; extend it to make it an element in a linklist of pools
185 */
186 struct infiBufferPool{
187         int numBuffers;
188         struct infiBuffer *buffers;
189         struct infiBufferPool *next;
190 };
191
192 /*****
193         It is the structure for the send buffers that are used
194         to send messages to other nodes
195 ********/
196
197
198 typedef struct infiPacketStruct {       
199         char *buf;
200         int size;
201         struct infiPacketHeader header;
202         struct ibv_mr *keyHeader;
203         struct OtherNodeStruct *destNode;
204         struct infiPacketStruct *next;
205         OutgoingMsg ogm;
206         struct ibv_sge elemList[2];
207         struct ibv_send_wr wr;
208 }* infiPacket;
209
210 /*
211 typedef struct infiBufferedWCStruct{
212         struct ibv_wc wcList[WC_BUFFER_SIZE];
213         int count;
214         struct infiBufferedWCStruct *next,*prev;
215 } * infiBufferedWC;
216 */
217
218 #define BCASTLIST_SIZE 50
219
220 struct infiBufferedBcastStruct{
221         char *msg;
222         int size;
223         int broot;
224         int asm_rank;
225         int valid;
226 };
227
228 typedef struct infiBufferedBcastPoolStruct{
229         struct infiBufferedBcastPoolStruct *next,*prev;
230         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
231         int count;
232
233 } *infiBufferedBcastPool;
234
235
236
237
238 /***
239         This structure represents the data needed by the infiniband
240         communication routines of a node
241         TODO: add locking for the smp version
242 */
243 struct infiContext {
244         struct ibv_context      *context;
245         
246         fd_set  asyncFds;
247         struct timeval tmo;
248         
249         int ibPort;
250 //      struct ibv_comp_channel *channel;
251         struct ibv_pd           *pd;
252         struct ibv_cq           *sendCq;
253         struct ibv_cq   *recvCq;
254         struct ibv_srq  *srq;
255         
256         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
257                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
258                                                                                                 //when the qps are stored in the corresponding OtherNodes
259
260         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
261
262         infiPacket infiPacketFreeList; 
263         
264         struct infiBufferPool *recvBufferPool;
265
266         struct infiPacketHeader header;
267
268         int srqSize;
269         int sendCqSize,recvCqSize;
270         int tokensLeft;
271
272         infiBufferedBcastPool bufferedBcastList;
273         
274         struct infiRdmaPacket *bufferedRdmaAcks;
275         
276         struct infiRdmaPacket *bufferedRdmaRequests;
277 /*      infiBufferedWC infiBufferedRecvList;*/
278         
279         int insideProcessBufferedBcasts;
280 };
281
282 static struct infiContext *context = NULL;
283
284
285
286
287 /** Represents a qp used to send messages to another node
288  There is one for each remote node
289 */
290 struct infiAddr {
291         int lid,qpn,psn;
292 };
293
294 /**
295  Stored in the OtherNode structure in machine-dgram.c 
296  Store the per node data for ibverbs layer
297 */
298 enum { INFI_HEADER_DATA=21,INFI_DATA};
299
300 struct infiOtherNodeData{
301         struct ibv_qp *qp ;
302         int state;// does it expect a packet with a header (first packet) or one without
303         int totalTokens;
304         int tokensLeft;
305         int nodeNo;
306         
307         int postedRecvs;
308         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
309 #if     CMK_IBVERBS_DEBUG       
310         int psn;
311         int recvPsn;
312 #endif  
313 };
314
315
316 /********************************
317 Memory management structures and types
318 *****************/
319
320 struct infiCmiChunkHeaderStruct;
321
322 typedef struct infiCmiChunkMetaDataStruct {
323         struct ibv_mr *key;
324         int poolIdx;
325         void *nextBuf;
326         struct infiCmiChunkHeaderStruct *owner;
327         int count;
328
329 #if THREAD_MULTI_POOL
330         int parentPe;                                           // the PE that allocated the buffer and must release it
331 #endif
332 } infiCmiChunkMetaData;
333
334
335
336
337 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
338
339 typedef struct {
340         int size;//without infiCmiChunkHeader
341         void *startBuf;
342         int count;
343 } infiCmiChunkPool;
344
345 #define INFINUMPOOLS 20
346 #define INFIMAXPERPOOL 400
347 #define INFIMULTIPOOL 0xDEAFB00D
348
349 #if THREAD_MULTI_POOL
350 static infiCmiChunkPool **infiCmiChunkPools;
351 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
352 #else
353 static infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
354 #endif
355
356 static void initInfiCmiChunkPools();
357
358
359 static inline infiPacket newPacket(){
360         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
361         pkt->size = -1;
362         pkt->header = context->header;
363         pkt->next = NULL;
364         pkt->destNode = NULL;
365         pkt->keyHeader = METADATAFIELD(pkt)->key;
366         pkt->ogm=NULL;
367         CmiAssert(pkt->keyHeader!=NULL);
368         pkt->buf=NULL;
369         
370         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
371         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
372         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
373         
374         pkt->wr.wr_id = (uint64_t)pkt;
375         pkt->wr.sg_list = &(pkt->elemList[0]);
376         pkt->wr.num_sge = 2;
377         pkt->wr.opcode = IBV_WR_SEND;
378         pkt->wr.send_flags = IBV_SEND_SIGNALED;
379         pkt->wr.next = NULL;
380         
381         return pkt;
382 };
383
384 #define FreeInfiPacket(pkt){ \
385         pkt->size = -1;\
386         pkt->ogm=NULL;\
387         pkt->buf=NULL;\
388         pkt->next = context->infiPacketFreeList; \
389         context->infiPacketFreeList = pkt; \
390 }
391
392 #define MallocInfiPacket(pkt) { \
393         infiPacket p = context->infiPacketFreeList; \
394         if(p == NULL){ p = newPacket();} \
395                  else{context->infiPacketFreeList = p->next; } \
396         pkt = p;\
397 }
398
399
400
401 void infi_unregAndFreeMeta(void *md)
402 {
403   if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL))
404     {
405       int unregstat=ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
406       CmiAssert(unregstat==0);
407       free(((infiCmiChunkMetaData *)md));
408 #if CMK_IBVERBS_STATS
409       numUnReg++;
410       numCurReg--;
411       numMultiSendUnreg++;
412 #endif
413     }
414 }
415
416
417 /******************CmiMachineInit and its helper functions*/
418 static inline int pollSendCq(const int toBuffer);
419
420 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
421 static uint16_t getLocalLid(struct ibv_context *context, int port);
422 static int  checkQp(struct ibv_qp *qp){
423         struct ibv_qp_attr attr;
424         struct ibv_qp_init_attr init_attr;
425                  
426         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
427         if(attr.cur_qp_state != IBV_QPS_RTS){
428                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
429                 return 0;
430         }
431         return 1;
432 }
433 static void checkAllQps(){
434         int i;
435         for(i=0;i<_Cmi_numnodes;i++){
436                 if(i != _Cmi_mynode){
437                         if(!checkQp(nodes[i].infiData->qp)){
438                                 pollSendCq(0);
439                                 CmiAssert(0);
440                         }
441                 }
442         }
443 }
444
445 #if CMK_IBVERBS_FAST_START
446 static void send_partial_init();
447 #endif
448
449 static void CmiMachineInit(char **argv){
450         struct ibv_device **devList;
451         struct ibv_device *dev;
452         int ibPort;
453         int i;
454         int calcMaxSize;
455         infiPacket *pktPtrs;
456         struct infiRdmaPacket **rdmaPktPtrs;
457         int num_devices, idev;
458
459         MACHSTATE(3,"CmiMachineInit {");
460         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
461         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
462         
463         //TODO: make the device and ibport configureable by commandline parameter
464         //Check example for how to do that
465         devList =  ibv_get_device_list(&num_devices);
466         CmiAssert(num_devices > 0);
467         CmiAssert(devList != NULL);
468
469         context = (struct infiContext *)malloc(sizeof(struct infiContext));
470         MACHSTATE1(3,"context allocated %p",context);
471         
472         //localAddr will store the local addresses of all the qps
473         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
474         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
475
476         idev = 0;
477
478         // try all devices, can't assume device 0 is IB, it may be ethernet
479 loop:
480         dev = devList[idev];
481         CmiAssert(dev != NULL);
482
483         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
484
485         //the context for this infiniband device 
486         context->context = ibv_open_device(dev);
487         CmiAssert(context->context != NULL);
488
489         // test ibPort
490         int MAXPORT = 8;
491         for (ibPort = 1; ibPort < MAXPORT; ibPort++) {
492           struct ibv_port_attr attr;
493           if (ibv_query_port(context->context, ibPort, &attr) != 0) continue;
494           if (attr.link_layer == IBV_LINK_LAYER_INFINIBAND)  break;
495           
496         }
497         if (ibPort == MAXPORT) {
498           if (++idev == num_devices)
499             CmiAbort("No valid IB port found!");
500           else
501             goto loop;
502         }
503         context->ibPort = ibPort;
504         MACHSTATE1(3,"use port %d", ibPort);
505         
506         MACHSTATE1(3,"device opened %p",context->context);
507
508 /*      FD_ZERO(&context->asyncFds);
509         FD_SET(context->context->async_fd,&context->asyncFds);
510         context->tmo.tv_sec=0;
511         context->tmo.tv_usec=0;
512         
513         MACHSTATE(3,"asyncFds zeroed and set");*/
514
515         //protection domain
516         context->pd = ibv_alloc_pd(context->context);
517         CmiAssert(context->pd != NULL);
518         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
519
520   /******** At this point we know that this node is more or less serviceable
521         So, this is a good point for sending the partial init message for the fast
522         start case
523         Moreover, no work dependent on the number of nodes has started yet.
524         ************/
525
526 #if CMK_IBVERBS_FAST_START
527   send_partial_init();
528 #endif
529
530
531         context->header.nodeNo = _Cmi_mynode;
532
533         mtu_size=1200;
534         packetSize = mtu_size*4;
535         dataSize = packetSize-sizeof(struct infiPacketHeader);
536         
537         calcMaxSize=8000;
538 /*      if(_Cmi_numnodes*50 > calcMaxSize){
539                 calcMaxSize = _Cmi_numnodes*50;
540                 if(calcMaxSize > 10000){
541                         calcMaxSize = 10000;
542                 }
543         }*/
544 //      maxRecvBuffers=80;
545         maxRecvBuffers=calcMaxSize;
546         maxTokens = maxRecvBuffers;
547 //      maxTokens = 80;
548         context->tokensLeft=maxTokens;
549         context->qp=NULL;
550         //tokensPerProcessor=4;
551         if(_Cmi_numnodes > 1){
552 #if !CMK_IBVERBS_FAST_START
553                 /* a barrier to make sure all nodes initialized the device */
554                 ChMessage msg;
555                 ctrl_sendone_nolock("barrier",NULL,0,NULL,0);
556                 ChMessage_recv(Cmi_charmrun_fd,&msg);
557 #endif
558                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
559         }
560         
561         if (Cmi_charmrun_fd == -1) return;
562         
563         //TURN ON RDMA
564         rdma=1;
565 //      rdmaThreshold=32768;
566         rdmaThreshold=22000;
567         firstBinSize = 120;
568         CmiAssert(rdmaThreshold > firstBinSize);
569         /*      blockAllocRatio=16;
570                 blockThreshold=8;*/
571
572         blockAllocRatio=32;
573         blockThreshold=8;
574
575
576
577 #if !THREAD_MULTI_POOL
578         initInfiCmiChunkPools();
579 #endif
580
581         /*create the pool of send packets*/
582         sendPacketPoolSize = maxTokens/2;       
583         if(sendPacketPoolSize > 2000){
584                 sendPacketPoolSize = 2000;
585         }
586         
587         context->infiPacketFreeList=NULL;
588         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
589
590         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
591 #if !THREAD_MULTI_POOL
592         for(i=0;i<sendPacketPoolSize;i++){
593                 MallocInfiPacket(pktPtrs[i]);   
594         }
595
596         for(i=0;i<sendPacketPoolSize;i++){
597                 FreeInfiPacket(pktPtrs[i]);     
598         }
599         free(pktPtrs);
600 #endif
601         
602         context->bufferedBcastList=NULL;
603         context->bufferedRdmaAcks = NULL;
604         context->bufferedRdmaRequests = NULL;
605         context->insideProcessBufferedBcasts=0;
606         
607         
608         if(rdma){
609 /*              int numPkts;
610                 int k;
611                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
612                         numPkts = _Cmi_numnodes*4;
613                 }else{
614                         numPkts = maxRecvBuffers/4;
615                 }
616                 
617                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
618                 for(k=0;k<numPkts;k++){
619                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
620                 }
621                 
622                 for(k=0;k<numPkts;k++){
623                         CmiFree(rdmaPktPtrs[k]);
624                 }
625                 free(rdmaPktPtrs);*/
626         }
627         
628 /*      context->infiBufferedRecvList = NULL;*/
629 #if CMK_IBVERBS_STATS   
630         regCount =0;
631         regTime  = 0;
632
633         pktCount=0;
634         msgCount=0;
635
636         processBufferedCount=0;
637         processBufferedTime=0;
638
639         minTokensLeft = maxTokens;
640 #endif  
641
642         
643
644         MACHSTATE(3,"} CmiMachineInit");
645 }
646
647 void CmiCommunicationInit(char **argv)
648 {
649 #if THREAD_MULTI_POOL
650         initInfiCmiChunkPools();
651         fillBufferPools();
652 #endif
653 }
654
655 /*********
656         Open a qp for every processor
657 *****/
658 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
659         int myLid;
660         int i;
661         
662         
663         //find my lid
664         myLid = getLocalLid(context->context,ibPort);
665         
666         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
667
668         context->sendCqSize = maxTokens+2;
669         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
670         CmiAssert(context->sendCq != NULL);
671         
672         MACHSTATE1(3,"sendCq created %p",context->sendCq);
673         
674         
675         context->recvCqSize = maxRecvBuffers;
676         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
677         
678         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
679         CmiAssert(context->recvCq != NULL);
680         
681         //array of queue pairs
682
683         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
684
685         if(numNodes > 1)
686         {
687                 context->srqSize = (maxRecvBuffers+2);
688                 struct ibv_srq_init_attr srqAttr = {
689                         .attr = {
690                         .max_wr  = context->srqSize,
691                         .max_sge = 1
692                         }
693                 };
694                 context->srq = ibv_create_srq(context->pd,&srqAttr);
695                 CmiAssert(context->srq != NULL);
696         
697                 struct ibv_qp_init_attr initAttr = {
698                         .qp_type = IBV_QPT_RC,
699                         .send_cq = context->sendCq,
700                         .recv_cq = context->recvCq,
701                         .srq             = context->srq,
702                         .sq_sig_all = 0,
703                         .qp_context = NULL,
704                         .cap     = {
705                                 .max_send_wr  = maxTokens,
706                                 .max_send_sge = 2,
707                         },
708                 };
709                 struct ibv_qp_attr attr;
710
711                 attr.qp_state        = IBV_QPS_INIT;
712                 attr.pkey_index      = 0;
713                 attr.port_num        = ibPort;
714                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
715
716 /*              MACHSTATE1(3,"context->pd %p",context->pd);
717                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
718                 MACHSTATE1(3,"TEST QP %p",qp);*/
719
720                 for( i=1;i<numNodes;i++){
721                         int n = (myNode + i)%numNodes;
722                         if(n == myNode){
723                         }else{
724                                 localAddr[n].lid = myLid;
725                                 context->qp[n] = ibv_create_qp(context->pd,&initAttr);
726                         
727                                 MACHSTATE2(3,"qp[%d] created %p",n,context->qp[n]);
728                                 CmiAssert(context->qp[n] != NULL);
729                         
730                                 ibv_modify_qp(context->qp[n], &attr,
731                                           IBV_QP_STATE              |
732                                           IBV_QP_PKEY_INDEX         |
733                                         IBV_QP_PORT               |
734                                         IBV_QP_ACCESS_FLAGS);           
735
736                                 localAddr[n].qpn = context->qp[n]->qp_num;
737                                 localAddr[n].psn = lrand48() & 0xffffff;
738                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",n,localAddr[n].lid,localAddr[n].qpn,localAddr[n].psn);
739                         }
740                 }
741         }
742         MACHSTATE(3,"qps created");
743 };
744
745 void copyInfiAddr(ChInfiAddr *qpList){
746         int qpListIdx=0;
747         int i;
748         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
749         for(i=0;i<_Cmi_numnodes;i++){
750                 if(i == _Cmi_mynode){
751                 }else{
752                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
753                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
754                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
755                         qpListIdx++;
756                 }
757         }
758 }
759
760
761 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
762         struct ibv_port_attr attr;
763
764         if (ibv_query_port(dev_context, port, &attr))
765                 return 0;
766
767         return attr.lid;
768 }
769
770 /**************** END OF CmiMachineInit and its helper functions*/
771
772 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
773 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
774
775 /* Initial the infiniband specific data for a remote node
776         1. connect the qp and store it in and return it
777 **/
778 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
779         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
780         int err;
781         ret->state = INFI_HEADER_DATA;
782         ret->qp = context->qp[node];
783 //      ret->totalTokens = tokensPerProcessor;
784 //      ret->tokensLeft = tokensPerProcessor;
785         ret->nodeNo = node;
786 //      ret->postedRecvs = tokensPerProcessor;
787 #if     CMK_IBVERBS_DEBUG       
788         ret->psn = 0;
789         ret->recvPsn = 0;
790 #endif
791         
792         struct ibv_qp_attr attr = {
793                 .qp_state               = IBV_QPS_RTR,
794                 .path_mtu               = mtu,
795                 .dest_qp_num            = addr[1],
796                 .rq_psn                 = addr[2],
797                 .max_dest_rd_atomic     = 1,
798                 .min_rnr_timer          = 31,
799                 .ah_attr                = {
800                         .is_global      = 0,
801                         .dlid           = addr[0],
802                         .sl             = 0,
803                         .src_path_bits  = 0,
804                         .port_num       = context->ibPort
805                 }
806         };
807         
808         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
809         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
810         
811         if (err = ibv_modify_qp(ret->qp, &attr,
812           IBV_QP_STATE              |
813           IBV_QP_AV                 |
814           IBV_QP_PATH_MTU           |
815           IBV_QP_DEST_QPN           |
816           IBV_QP_RQ_PSN             |
817           IBV_QP_MAX_DEST_RD_ATOMIC |
818           IBV_QP_MIN_RNR_TIMER)) {
819                         MACHSTATE1(3,"ERROR %d",err);
820                         CmiAbort("failed to change qp state to RTR");
821         }
822
823         MACHSTATE(3,"qp state changed to RTR");
824         
825         attr.qp_state       = IBV_QPS_RTS;
826         attr.timeout        = 26;
827         attr.retry_cnt      = 20;
828         attr.rnr_retry      = 7;
829         attr.sq_psn         = context->localAddr[node].psn;
830         attr.max_rd_atomic  = 1;
831
832         
833         if (ibv_modify_qp(ret->qp, &attr,
834           IBV_QP_STATE              |
835           IBV_QP_TIMEOUT            |
836           IBV_QP_RETRY_CNT          |
837           IBV_QP_RNR_RETRY          |
838           IBV_QP_SQ_PSN             |
839           IBV_QP_MAX_QP_RD_ATOMIC)) {
840                         fprintf(stderr, "Failed to modify QP to RTS\n");
841                         exit(1);
842         }
843         MACHSTATE(3,"qp state changed to RTS");
844
845         MACHSTATE(3,"} initInfiOtherNodeData");
846         return ret;
847 }
848
849
850 void    infiPostInitialRecvs(){
851         //create the pool and post the receives
852         int numPosts;
853 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
854                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
855         }else{
856                 numPosts = maxRecvBuffers;
857         }*/
858
859         if(_Cmi_numnodes > 1){
860                 numPosts = maxRecvBuffers;
861         }else{
862                 numPosts = 0;
863         }
864         if(numPosts > 0){
865                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
866                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
867         }
868
869
870         if (context->qp) {
871           free(context->qp);
872           context->qp = NULL;
873         }
874         free(context->localAddr);
875         context->localAddr= NULL;
876 }
877
878 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
879         int numBuffers;
880         int i;
881         int bigSize;
882         char *bigBuf;
883         struct infiBufferPool *ret;
884         struct ibv_mr *bigKey;
885
886         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
887
888         page_size = sysconf(_SC_PAGESIZE);
889         ret = malloc(sizeof(struct infiBufferPool));
890         ret->next = NULL;
891         numBuffers=ret->numBuffers = numRecvs;
892         
893         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
894         
895         bigSize = numBuffers*sizePerBuffer;
896         bigBuf=malloc(bigSize);
897         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
898 #if CMK_IBVERBS_STATS
899         numCurReg++;
900         numReg++;
901 #endif
902
903         CmiAssert(bigKey != NULL);
904         
905         for(i=0;i<numBuffers;i++){
906                 struct infiBuffer *buffer =  &(ret->buffers[i]);
907                 buffer->type = BUFFER_RECV;
908                 buffer->size = sizePerBuffer;
909                 
910
911                 buffer->buf = &bigBuf[i*sizePerBuffer];
912                 buffer->key = bigKey;
913
914                 if(buffer->key == NULL){
915                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
916                         CmiAssert(buffer->key != NULL);
917                 }
918         }
919         return ret;
920 };
921
922
923
924 /**
925          Post the buffers as recv work requests
926 */
927 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
928         int j,err;
929         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
930         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
931         struct ibv_recv_wr *bad_wr;
932         
933         int startBufferIdx=0;
934         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
935         for(j=0;j<numRecvs;j++){
936                 
937                 
938                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
939                 sgElements[j].length = sizePerBuffer;
940                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
941                 
942                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
943                 workRequests[j].sg_list = &sgElements[j];
944                 workRequests[j].num_sge = 1;
945                 if(j != numRecvs-1){
946                         workRequests[j].next = &workRequests[j+1];
947                 }
948                 
949         }
950         workRequests[numRecvs-1].next = NULL;
951         MACHSTATE(3,"About to call ibv_post_srq_recv");
952         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
953                 CmiAssert(0);
954         }
955
956         free(workRequests);
957         free(sgElements);
958 }
959
960
961
962
963 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
964
965 void CmiMachineExit()
966 {
967 #if CMK_IBVERBS_STATS   
968         printf("[%d] numReg %d numUnReg %d numCurReg %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
969 #endif
970
971 }
972 static void ServiceCharmrun_nolock();
973
974 static void CmiNotifyStillIdle(CmiIdleState *s) {
975 #if CMK_SMP
976         CmiCommLock();
977 /*      if(where == COMM_SERVER_FROM_SMP)*/
978 #endif
979 /*              ServiceCharmrun_nolock();*/
980
981         CommunicationServer_nolock(0);
982 #if CMK_SMP
983         CmiCommUnlock();
984 #endif
985 }
986
987 static inline void increaseTokens(OtherNode node);
988
989 static inline int pollRecvCq(const int toBuffer);
990 static inline int pollSendCq(const int toBuffer);
991
992
993 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
994 #if !CMK_IBVERBS_TOKENS_FLOW
995         return;
996 #else
997         //if(infiData->tokensLeft == 0){
998         if(context->tokensLeft == 0){
999                 MACHSTATE(3,"GET FREE TOKENS {{{");
1000         }else{
1001                 return;
1002         }
1003         while(context->tokensLeft == 0){
1004                 CommunicationServer_nolock(1); 
1005         }
1006         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
1007 #endif
1008 }
1009
1010
1011 /**
1012         Packetize this data and send it
1013 **/
1014
1015
1016
1017 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
1018         int incTokens=0;
1019         int retval;
1020 #if     CMK_IBVERBS_DEBUG
1021         packet->header.psn = (++node->infiData->psn);
1022 #endif  
1023
1024
1025
1026         packet->elemList[1].addr = (uintptr_t)packet->buf;
1027         packet->elemList[1].length = size;
1028         packet->elemList[1].lkey = dataKey->lkey;
1029         
1030         
1031         packet->destNode = node;
1032         
1033 #if CMK_IBVERBS_STATS   
1034         pktCount++;
1035 #endif  
1036         
1037         getFreeTokens(node->infiData);
1038
1039 #if CMK_IBVERBS_INCTOKENS       
1040         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
1041                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
1042                 incTokens=1;
1043         }
1044 #endif
1045 /*
1046         if(!checkQp(node->infiData->qp)){
1047                 pollSendCq(1);
1048                 CmiAssert(0);
1049         }*/
1050
1051         struct ibv_send_wr *bad_wr=NULL;
1052         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1053                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
1054                 CmiAssert(0);
1055         }
1056 #if     CMK_IBVERBS_TOKENS_FLOW
1057         context->tokensLeft--;
1058 #if     CMK_IBVERBS_STATS
1059         if(context->tokensLeft < minTokensLeft){
1060                 minTokensLeft = context->tokensLeft;
1061         }
1062 #endif
1063 #endif
1064
1065 /*      if(!checkQp(node->infiData->qp)){
1066                 pollSendCq(1);
1067                 CmiAssert(0);
1068         }*/
1069
1070 #if CMK_IBVERBS_INCTOKENS       
1071         if(incTokens){
1072                 increaseTokens(node);
1073         }
1074 #endif
1075
1076
1077 #if     CMK_IBVERBS_DEBUG
1078         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1079 #else
1080         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1081 #endif
1082
1083 };
1084
1085
1086 static void inline EnqueueDummyPacket(OtherNode node,int size){
1087         infiPacket packet;
1088         MallocInfiPacket(packet);
1089         packet->size = size;
1090         packet->buf = CmiAlloc(size);
1091         
1092         packet->header.code = INFIDUMMYPACKET;
1093
1094         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1095         
1096         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1097         EnqueuePacket(node,packet,size,key);
1098 }
1099
1100
1101
1102
1103
1104
1105 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1106         infiPacket packet;
1107         MallocInfiPacket(packet);
1108         packet->size = size;
1109         packet->buf=data;
1110         
1111         //the nodeNo is added at time of packet allocation
1112         packet->header.code = INFIPACKETCODE_DATA;
1113         
1114         ogm->refcount++;
1115         packet->ogm = ogm;
1116         
1117         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1118         CmiAssert(key != NULL);
1119         
1120         EnqueuePacket(node,packet,size,key);
1121 };
1122
1123 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1124 static inline void processAllBufferedMsgs();
1125
1126 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1127         int size; char *data;
1128 //      processAllBufferedMsgs();
1129
1130         
1131         ogm->refcount++;
1132   size = ogm->size;
1133   data = ogm->data;
1134
1135 #if CMK_IBVERBS_STATS   
1136         msgCount++;
1137 #endif
1138
1139         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1140         //First packet has dgram header, other packets dont
1141         
1142   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1143         
1144         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1145
1146         if(rdma && size > rdmaThreshold){
1147                         EnqueueRdmaPacket(ogm,node);
1148         }else{
1149         
1150                 while(size > dataSize){
1151                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1152                         size -= dataSize;
1153                         data += dataSize;
1154                 }
1155                 if(size > 0){
1156                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1157                 }
1158         }
1159 //#if   !CMK_SMP
1160         processAllBufferedMsgs();
1161 //#endif
1162         ogm->refcount--;
1163         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1164 }
1165
1166
1167 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1168         infiPacket packet;
1169
1170         ogm->refcount++;
1171         
1172         MallocInfiPacket(packet);
1173  
1174  {
1175                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1176
1177                 
1178                 packet->size = sizeof(struct infiRdmaPacket);
1179                 packet->buf = (char *)rdmaPacket;
1180                 
1181                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1182
1183                 CmiAssert(key!=NULL);
1184
1185                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1186                 
1187                 packet->header.code = INFIRDMA_START;
1188                 packet->header.nodeNo = _Cmi_mynode;
1189                 packet->ogm = NULL;
1190
1191                 rdmaPacket->type = INFI_MESG;
1192                 rdmaPacket->ogm = ogm;
1193                 rdmaPacket->key = *key;
1194                 rdmaPacket->keyPtr = key;
1195                 rdmaPacket->remoteBuf = ogm->data;
1196                 rdmaPacket->remoteSize = ogm->size;
1197                 
1198                 
1199                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1200                 
1201                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1202                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1203         }
1204 }
1205
1206
1207
1208 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1209 static inline void processSendWC(struct ibv_wc *sendWC);
1210 static unsigned int _count=0;
1211 extern int errno;
1212 static int _countAsync=0;
1213 static inline void processAsyncEvents(){
1214         struct ibv_async_event event;
1215         int ready;
1216         _countAsync++;
1217         if(_countAsync < 1){
1218                 return;
1219         }
1220         _countAsync=0;
1221         FD_SET(context->context->async_fd,&context->asyncFds);
1222         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1223         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1224         
1225         if(ready==0){
1226                 return;
1227         }
1228         if(ready == -1){
1229 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1230                 return;
1231         }
1232         
1233         if (ibv_get_async_event(context->context, &event)){
1234                 return;
1235                 CmiAbort("get async event failed");
1236         }
1237         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1238         ibv_ack_async_event(&event);
1239
1240         
1241 }
1242
1243 static void pollCmiDirectQ();
1244
1245 static inline  void CommunicationServer_nolock(int toBuffer) {
1246         int processed;
1247         if(_Cmi_numnodes <= 1){
1248                 pollCmiDirectQ();
1249                 return;
1250         }
1251         MACHSTATE(2,"CommServer_nolock{");
1252         
1253 //      processAsyncEvents();
1254         
1255 //      checkAllQps();
1256
1257         pollCmiDirectQ();
1258
1259         processed = pollRecvCq(toBuffer);
1260         
1261
1262         processed += pollSendCq(toBuffer);
1263         
1264         if(toBuffer == 0){
1265 //              if(processed != 0)
1266                         processAllBufferedMsgs();
1267         }
1268         
1269 //      checkAllQps();
1270 //      _count--;
1271
1272         MACHSTATE(2,"} CommServer_nolock ne");
1273         
1274 }
1275 /*
1276 static inline infiBufferedWC createInfiBufferedWC(){
1277         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1278         ret->count = 0;
1279         ret->next = ret->prev =NULL;
1280         return ret;
1281 }*/
1282
1283 /****
1284         The buffered recvWC are stored in a doubly linked list of 
1285         arrays or blocks of wcs.
1286         To keep the average insert cost low, a new block is added 
1287         to the top of the list. (resulting in a reverse seq of blocks)
1288         Within a block however wc are stored in a sequence
1289 *****/
1290 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1291         infiBufferedWC block;
1292         MACHSTATE(3,"Insert Buffered Recv called");
1293         if( context->infiBufferedRecvList ==NULL){
1294                 context->infiBufferedRecvList = createInfiBufferedWC();
1295                 block = context->infiBufferedRecvList;
1296         }else{
1297                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1298                         block = createInfiBufferedWC();
1299                         context->infiBufferedRecvList->prev = block;
1300                         block->next = context->infiBufferedRecvList;
1301                         context->infiBufferedRecvList = block;
1302                 }else{
1303                         block = context->infiBufferedRecvList;
1304                 }
1305         }
1306         
1307         block->wcList[block->count] = *wc;
1308         block->count++;
1309 };
1310 */
1311
1312
1313 /********
1314 go through the blocks of bufferedWC. Process the last block first.
1315 Then the next one and so on. (Processing within a block should happen
1316 in sequence).
1317 Leave the last block in place to avoid having to allocate again
1318 ******/
1319 /*static inline void processBufferedRecvList(){
1320         infiBufferedWC start;
1321         start = context->infiBufferedRecvList;
1322         while(start->next != NULL){
1323                 start = start->next;
1324         }
1325         while(start != NULL){
1326                 int i=0;
1327                 infiBufferedWC tmp;
1328                 for(i=0;i<start->count;i++){
1329                         processRecvWC(&start->wcList[i]);
1330                 }
1331                 if(start != context->infiBufferedRecvList){
1332                         //not the first one
1333                         tmp = start;
1334                         start = start->prev;
1335                         free(tmp);
1336                         start->next = NULL;
1337                 }else{
1338                         start = start->prev;
1339                 }
1340         }
1341         context->infiBufferedRecvList->next = NULL;
1342         context->infiBufferedRecvList->prev = NULL;
1343         context->infiBufferedRecvList->count = 0;
1344 }
1345 */
1346
1347 static inline int pollRecvCq(const int toBuffer){
1348         int i;
1349         int ne;
1350         struct ibv_wc wc[WC_LIST_SIZE];
1351         
1352         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1353         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1354 //      CmiAssert(ne >=0);
1355         
1356         if(ne != 0){
1357                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1358         }
1359         
1360         for(i=0;i<ne;i++){
1361                 if(wc[i].status != IBV_WC_SUCCESS){
1362                         CmiAssert(0);
1363                 }
1364                 switch(wc[i].opcode){
1365                         case IBV_WC_RECV:
1366                                         processRecvWC(&wc[i],toBuffer);
1367                                 break;
1368                         default:
1369                                 CmiAbort("Wrong type of work completion object in recvq");
1370                                 break;
1371                 }
1372                         
1373         }
1374         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1375         return ne;
1376
1377 }
1378
1379 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1380
1381 static inline int pollSendCq(const int toBuffer){
1382         int i;
1383         int ne;
1384         struct ibv_wc wc[WC_LIST_SIZE];
1385
1386         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1387 //      CmiAssert(ne >=0);
1388         
1389         
1390         for(i=0;i<ne;i++){
1391                 if(wc[i].status != IBV_WC_SUCCESS){
1392                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1393 #if CMK_IBVERBS_STATS
1394         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1395 #endif
1396                         CmiAssert(0);
1397                 }
1398                 switch(wc[i].opcode){
1399                         case IBV_WC_SEND:{
1400                                 //message received
1401                                 processSendWC(&wc[i]);
1402                                 
1403                                 break;
1404                                 }
1405                         case IBV_WC_RDMA_READ:
1406                         {
1407 //                              processRdmaWC(&wc[i],toBuffer);
1408                                         processRdmaWC(&wc[i],1);
1409                                 break;
1410                         }
1411                         case IBV_WC_RDMA_WRITE:
1412                         {
1413                                 /*** used for CmiDirect puts 
1414                                 Nothing needs to be done on the sender side once send is done **/
1415                                 break;
1416                         }
1417                         default:
1418                                 CmiAbort("Wrong type of work completion object in recvq");
1419                                 break;
1420                 }
1421                         
1422         }
1423         return ne;
1424 }
1425
1426
1427 /******************
1428 Check the communication server socket and
1429
1430 *****************/
1431 int CheckSocketsReady(int withDelayMs)
1432 {   
1433   int nreadable;
1434   CMK_PIPE_DECL(withDelayMs);
1435
1436   CmiStdoutAdd(CMK_PIPE_SUB);
1437   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1438
1439   nreadable=CMK_PIPE_CALL();
1440   ctrlskt_ready_read = 0;
1441   dataskt_ready_read = 0;
1442   dataskt_ready_write = 0;
1443   
1444   if (nreadable == 0) {
1445     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1446     return nreadable;
1447   }
1448   if (nreadable==-1) {
1449     CMK_PIPE_CHECKERR();
1450     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1451     return CheckSocketsReady(0);
1452   }
1453   CmiStdoutCheck(CMK_PIPE_SUB);
1454   if (Cmi_charmrun_fd!=-1) 
1455           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1456   MACHSTATE(1,"} CheckSocketsReady")
1457   return nreadable;
1458 }
1459
1460
1461 /*** Service the charmrun socket
1462 *************/
1463
1464 static void ServiceCharmrun_nolock()
1465 {
1466   int again = 1;
1467   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1468   while (again)
1469   {
1470   again = 0;
1471   CheckSocketsReady(0);
1472   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1473   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1474   }
1475   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1476 }
1477
1478
1479
1480 static void CommunicationServer(int sleepTime, int where){
1481         if( where == COMM_SERVER_FROM_INTERRUPT){
1482 #if CMK_IMMEDIATE_MSG
1483                 CmiHandleImmediate();
1484 #endif
1485                 return;
1486         }
1487 #if CMK_SMP
1488         if(where == COMM_SERVER_FROM_WORKER){
1489                 return;
1490         }
1491         if(where == COMM_SERVER_FROM_SMP){
1492 #endif
1493                 ServiceCharmrun_nolock();
1494 #if CMK_SMP
1495         }
1496         CmiCommLock();
1497 #endif
1498         CommunicationServer_nolock(0);
1499 #if CMK_SMP
1500         CmiCommUnlock();
1501 #endif
1502
1503         /* when called by communication thread or in interrupt */
1504 #if CMK_IMMEDIATE_MSG
1505         if (where == COMM_SERVER_FROM_SMP) {
1506                 CmiHandleImmediate();
1507         }
1508 #endif
1509 }
1510
1511
1512 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1513
1514
1515 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1516
1517 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1518         char *newmsg;
1519         
1520         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1521         
1522         OtherNode node = &nodes[nodeNo];
1523         newmsg = node->asm_msg;         
1524         
1525         /// This simple state machine determines if this packet marks the beginning of a new message
1526         // from another node, or if this is another in a sequence of packets
1527         switch(node->infiData->state){
1528                 case INFI_HEADER_DATA:
1529                 {
1530                         int size;
1531                         int rank, srcpe, seqno, magic, i;
1532                         unsigned int broot;
1533                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1534                         size = CmiMsgHeaderGetLength(msg);
1535                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1536 //                      CmiAssert(size > 0);
1537 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1538                         
1539 //                      CmiAssert(newmsg == NULL);
1540                         if(len > size){
1541                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1542                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1543                         }
1544                         newmsg = (char *)CmiAlloc(size);
1545       _MEMCHECK(newmsg);
1546       memcpy(newmsg, msg, len);
1547       node->asm_rank = rank;
1548       node->asm_total = size;
1549       node->asm_fill = len;
1550       node->asm_msg = newmsg;
1551                         node->infiData->broot = broot;
1552                         if(len == size){
1553                                 //this is the only packet for this message 
1554                                 node->infiData->state = INFI_HEADER_DATA;
1555                         }else{
1556                                 //there are more packets following
1557                                 node->infiData->state = INFI_DATA;
1558                         }
1559                         break;
1560                 }
1561                 case INFI_DATA:
1562                 {
1563                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1564                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1565                                 CmiAbort("packet in the middle does not have expected length");
1566                         }
1567                         if(node->asm_fill+len > node->asm_total){
1568                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1569                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1570                         }
1571                         //tODO: remove this
1572                         memcpy(newmsg + node->asm_fill,msg,len);
1573                         node->asm_fill += len;
1574                         if(node->asm_fill == node->asm_total){
1575                                 node->infiData->state = INFI_HEADER_DATA;
1576                         }else{
1577                                 node->infiData->state = INFI_DATA;
1578                         }
1579                         break;
1580                 }
1581         }
1582         /// if this packet was the last packet in a message ie state was 
1583         /// reset to infi_header_data
1584         
1585         if(node->infiData->state == INFI_HEADER_DATA){
1586                 int total_size = node->asm_total;
1587                 node->asm_msg = NULL;
1588                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1589                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1590         }
1591         
1592 };
1593
1594 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1595 #if CMK_BROADCAST_SPANNING_TREE
1596         if (rank == DGRAM_BROADCAST
1597 #if CMK_NODE_QUEUE_AVAILABLE
1598           || rank == DGRAM_NODEBROADCAST
1599 #endif
1600          ){
1601                  if(toBuffer){
1602                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1603                 }else{
1604                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1605                 }
1606         }
1607 #elif CMK_BROADCAST_HYPERCUBE
1608         if (rank == DGRAM_BROADCAST
1609 #if CMK_NODE_QUEUE_AVAILABLE
1610           || rank == DGRAM_NODEBROADCAST
1611 #endif
1612          ){
1613                  if(toBuffer){
1614                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1615                  }else{
1616                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1617                 }
1618         }
1619 #endif
1620
1621         switch (rank) {
1622         case DGRAM_BROADCAST: {
1623                                 int i;
1624                                 for (i=1; i<_Cmi_mynodesize; i++){
1625                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1626                                 }
1627           CmiPushPE(0, newmsg);
1628           break;
1629       }
1630 #if CMK_NODE_QUEUE_AVAILABLE
1631         case DGRAM_NODEBROADCAST: 
1632         case DGRAM_NODEMESSAGE: {
1633           CmiPushNode(newmsg);
1634           break;
1635         }
1636 #endif
1637         default:
1638                                 {
1639                                         
1640           CmiPushPE(rank, newmsg);
1641                                 }
1642         }    /* end of switch */
1643                 if(!toBuffer){
1644 //#if !CMK_SMP          
1645                 processAllBufferedMsgs();
1646 //#endif
1647                 }
1648 }
1649
1650
1651 static inline void increasePostedRecvs(int nodeNo);
1652 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1653 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1654
1655 //struct infiDirectRequestPacket;
1656 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1657
1658 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1659         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1660         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1661         int nodeNo = header->nodeNo;
1662 #if     CMK_IBVERBS_DEBUG
1663         OtherNode node = &nodes[nodeNo];
1664 #endif
1665         
1666         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1667 #if     CMK_IBVERBS_DEBUG
1668         /*
1669         if(node->infiData->recvPsn == 0){
1670                 node->infiData->recvPsn = header->psn;
1671         }else{
1672                         CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1673            *            node->infiData->recvPsn++; 
1674         }
1675         */
1676         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1677 #else
1678         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1679 #endif
1680         
1681         if(header->code & INFIPACKETCODE_DATA){
1682                         
1683                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1684         }
1685         if(header->code & INFIDUMMYPACKET){
1686                 MACHSTATE(3,"Dummy packet");
1687         }
1688         if(header->code & INFIBARRIERPACKET){
1689                 MACHSTATE(3,"Barrier packet");
1690                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1691         }
1692
1693 #if CMK_IBVERBS_INCTOKENS       
1694         if(header->code & INFIPACKETCODE_INCTOKENS){
1695                 increasePostedRecvs(nodeNo);
1696         }
1697 #endif  
1698         if(rdma && header->code & INFIRDMA_START){
1699                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1700 //              if(toBuffer){
1701                         //TODO: make a function of this and use for both acks and requests
1702                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1703                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1704                         *copyPacket = *rdmaPacket;
1705                         copyPacket->fromNodeNo = nodeNo;
1706                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1707                         context->bufferedRdmaRequests = copyPacket;
1708                         copyPacket->next = tmp;
1709                         copyPacket->prev = NULL;
1710                         if(tmp != NULL){
1711                                 tmp->prev = copyPacket;
1712                         }
1713 /*              }else{
1714                         processRdmaRequest(rdmaPacket,nodeNo,0);
1715                 }*/
1716         }
1717         if(rdma && header->code & INFIRDMA_ACK){
1718                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1719                 processRdmaAck(rdmaPacket);
1720         }
1721 /*      if(header->code & INFIDIRECT_REQUEST){
1722                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1723                 processDirectRequest(directRequestPacket);
1724         }*/
1725         {
1726                 struct ibv_sge list = {
1727                         .addr   = (uintptr_t) buffer->buf,
1728                         .length = buffer->size,
1729                         .lkey   = buffer->key->lkey
1730                 };
1731         
1732                 struct ibv_recv_wr wr = {
1733                         .wr_id = (uint64_t)buffer,
1734                         .sg_list = &list,
1735                         .num_sge = 1,
1736                         .next = NULL
1737                 };
1738                 struct ibv_recv_wr *bad_wr;
1739         
1740                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1741                         CmiAssert(0);
1742                 }
1743         }
1744
1745 };
1746
1747
1748
1749
1750 static inline  void processSendWC(struct ibv_wc *sendWC){
1751
1752         infiPacket packet = (infiPacket )sendWC->wr_id;
1753 #if CMK_IBVERBS_TOKENS_FLOW
1754 //      packet->destNode->infiData->tokensLeft++;
1755         context->tokensLeft++;
1756 #endif
1757
1758         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1759         if(packet->ogm != NULL){
1760                 packet->ogm->refcount--;
1761                 if(packet->ogm->refcount == 0){
1762                         GarbageCollectMsg(packet->ogm); 
1763                 }
1764         }else{
1765                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1766                    if (packet->buf) CmiFree(packet->buf);  /* gzheng */
1767                 }
1768         }
1769
1770         FreeInfiPacket(packet);
1771 };
1772
1773
1774
1775 /********************************************************************/
1776 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1777         int nodeNo = fromNodeNo;
1778         OtherNode node = &nodes[nodeNo];
1779         struct infiRdmaPacket *rdmaPacket;
1780
1781         getFreeTokens(node->infiData);
1782 #if CMK_IBVERBS_TOKENS_FLOW
1783 //      node->infiData->tokensLeft--;
1784         context->tokensLeft--;
1785 #if     CMK_IBVERBS_STATS
1786         if(context->tokensLeft < minTokensLeft){
1787                 minTokensLeft = context->tokensLeft;
1788         }
1789 #endif
1790 #endif
1791         
1792         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1793 //      CmiAssert(buffer != NULL);
1794         
1795         
1796         if(isBuffered){
1797                 rdmaPacket = _rdmaPacket;
1798         }else{
1799                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1800                 *rdmaPacket = *_rdmaPacket;
1801         }
1802
1803
1804         rdmaPacket->fromNodeNo = fromNodeNo;
1805         rdmaPacket->localBuffer = (void *)buffer;
1806         
1807         buffer->type = BUFFER_RDMA;
1808         buffer->size = rdmaPacket->remoteSize;
1809         
1810         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1811 //      CmiAssert(buffer->buf != NULL);
1812
1813         buffer->key = METADATAFIELD(buffer->buf)->key;
1814
1815         
1816         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1817         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1818 //      CmiAssert(buffer->key != NULL);
1819         
1820         {
1821                 struct ibv_sge list = {
1822                         .addr = (uintptr_t )buffer->buf,
1823                         .length = buffer->size,
1824                         .lkey   = buffer->key->lkey
1825                 };
1826
1827                 struct ibv_send_wr *bad_wr;
1828                 struct ibv_send_wr wr = {
1829                         .wr_id = (uint64_t )rdmaPacket,
1830                         .sg_list = &list,
1831                         .num_sge = 1,
1832                         .opcode = IBV_WR_RDMA_READ,
1833                         .send_flags = IBV_SEND_SIGNALED,
1834                         .wr.rdma = {
1835                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1836                                 .rkey = rdmaPacket->key.rkey
1837                         }
1838                 };
1839                 /** post and rdma_read that is a rdma get*/
1840                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1841                         CmiAssert(0);
1842                 }
1843         }
1844
1845 };
1846
1847 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1848 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1849
1850 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1851                 //rdma get done
1852 #if CMK_IBVERBS_STATS
1853         double _startRegTime;
1854 #endif  
1855
1856         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1857 /*      if(rdmaPacket->type == INFI_DIRECT){
1858                 processDirectWC(rdmaPacket);
1859                 return;
1860         }*/
1861 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1862         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1863
1864         /*TODO: remove this
1865         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1866         
1867 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1868         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1869         
1870         {
1871                 int size;
1872                 int rank, srcpe, seqno, magic, i;
1873                 unsigned int broot;
1874                 char *msg = buffer->buf;
1875                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1876                 size = CmiMsgHeaderGetLength(msg);
1877 /*              CmiAssert(size == buffer->size);*/
1878                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1879         }
1880         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1881
1882         
1883         free(buffer);
1884         
1885         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1886         //we are sending this ack as a response to a successful
1887         // rdma_Read.. the token for that rdma_Read needs to be freed
1888 #if CMK_IBVERBS_TOKENS_FLOW     
1889         //node->infiData->tokensLeft++;
1890         context->tokensLeft++;
1891 #endif
1892
1893         //send ack to sender if toBuffer is off otherwise buffer it
1894         if(toBuffer){
1895                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1896                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1897                 context->bufferedRdmaAcks = rdmaPacket;
1898                 rdmaPacket->next = tmp;
1899                 rdmaPacket->prev = NULL;
1900                 if(tmp != NULL){
1901                         tmp->prev = rdmaPacket; 
1902                 }
1903         }else{
1904                 EnqueueRdmaAck(rdmaPacket);             
1905                 free(rdmaPacket);
1906         }
1907 }
1908
1909 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1910         infiPacket packet;
1911         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1912
1913         
1914         MallocInfiPacket(packet);
1915         {
1916                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1917                 *ackPacket = *rdmaPacket;
1918                 packet->size = sizeof(struct infiRdmaPacket);
1919                 packet->buf = (char *)ackPacket;
1920                 packet->header.code = INFIRDMA_ACK;
1921                 packet->ogm=NULL;
1922                 
1923                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1924         
1925
1926                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1927         }
1928 };
1929
1930
1931 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1932         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1933         rdmaPacket->ogm->refcount--;
1934         GarbageCollectMsg(rdmaPacket->ogm);
1935 }
1936
1937
1938 /****************************
1939  Deal with all the buffered (delayed) messages
1940  such as processing recvd broadcasts, sending
1941  rdma acks and processing recvd rdma requests
1942 ******************************/
1943
1944
1945 static inline infiBufferedBcastPool createBcastPool(){
1946         int i;
1947         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1948         ret->count = 0;
1949         ret->next = ret->prev = NULL;   
1950         for(i=0;i<BCASTLIST_SIZE;i++){
1951                 ret->bcastList[i].valid = 0;
1952         }
1953         return ret;
1954 };
1955 /****
1956         The buffered bcast messages are stored in a doubly linked list of 
1957         arrays or blocks.
1958         To keep the average insert cost low, a new block is added 
1959         to the top of the list. (resulting in a reverse seq of blocks)
1960         Within a block however bcast are stored in increasing order sequence
1961 *****/
1962
1963 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1964         if(context->bufferedBcastList == NULL){
1965                 context->bufferedBcastList = createBcastPool();
1966         }else{
1967                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1968                         infiBufferedBcastPool tmp;
1969                         tmp = createBcastPool();
1970                         context->bufferedBcastList->prev = tmp;
1971                         tmp->next = context->bufferedBcastList;
1972                         context->bufferedBcastList = tmp;
1973                 }
1974         }
1975         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1976         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1977         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1978         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1979         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1980         
1981         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1982         
1983         context->bufferedBcastList->count++;
1984 }
1985
1986 /*********
1987         Go through the blocks of buffered bcast messages. process last block first
1988         processign within a block is in sequence though
1989 *********/
1990 static inline void processBufferedBcast(){
1991         infiBufferedBcastPool start;
1992
1993         if(context->bufferedBcastList == NULL){
1994                 return;
1995         }
1996         start = context->bufferedBcastList;
1997         if(context->insideProcessBufferedBcasts==1){
1998                 return;
1999         }
2000         context->insideProcessBufferedBcasts=1;
2001
2002         while(start->next != NULL){
2003                 start = start->next;
2004         }
2005         
2006         while(start != NULL){
2007                 int i=0;
2008                 infiBufferedBcastPool tmp;
2009                 if(start->count != 0){
2010                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
2011                 }
2012                 for(i=0;i<start->count;i++){
2013                         if(start->bcastList[i].valid == 0){
2014                                 continue;
2015                         }
2016                         start->bcastList[i].valid=0;
2017                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
2018 #if CMK_BROADCAST_SPANNING_TREE
2019         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
2020 #if CMK_NODE_QUEUE_AVAILABLE
2021           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
2022 #endif
2023          ){
2024                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
2025                 CmiFree(start->bcastList[i].msg);           /* gzheng */
2026                                         }
2027 #elif CMK_BROADCAST_HYPERCUBE
2028         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
2029 #if CMK_NODE_QUEUE_AVAILABLE
2030           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
2031 #endif
2032          ){
2033                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
2034                 CmiFree(start->bcastList[i].msg);           /* gzheng */
2035                                         }
2036 #endif
2037                 }
2038                 if(start->count != 0){
2039                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
2040                 }
2041                 
2042                 tmp = start;
2043                 start = start->prev;
2044                 free(tmp);
2045                 if(start != NULL){
2046                         //not the first one
2047                         start->next = NULL;
2048                 }
2049         }
2050
2051         context->bufferedBcastList = NULL;
2052 /*      context->bufferedBcastList->prev = NULL;
2053         context->bufferedBcastList->count =0;   */
2054         context->insideProcessBufferedBcasts=0;
2055         MACHSTATE(2,"processBufferedBcast done ");
2056 };
2057
2058
2059 static inline void processBufferedRdmaAcks(){
2060         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2061         if(start == NULL){
2062                 return;
2063         }
2064         while(start->next != NULL){
2065                 start = start->next;
2066         }
2067         while(start != NULL){
2068                 struct infiRdmaPacket *rdmaPacket=start;
2069                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2070                 EnqueueRdmaAck(rdmaPacket);
2071                 start = start->prev;
2072                 free(rdmaPacket);
2073         }
2074         context->bufferedRdmaAcks=NULL;
2075 }
2076
2077
2078
2079 static inline void processBufferedRdmaRequests(){
2080         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2081         if(start == NULL){
2082                 return;
2083         }
2084         
2085         
2086         while(start->next != NULL){
2087                 start = start->next;
2088         }
2089         while(start != NULL){
2090                 struct infiRdmaPacket *rdmaPacket=start;
2091                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2092                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2093                 start = start->prev;
2094         }
2095         
2096         context->bufferedRdmaRequests=NULL;
2097 }
2098
2099
2100
2101
2102
2103 static inline void processAllBufferedMsgs(){
2104 #if CMK_IBVERBS_STATS
2105         double _startTime = CmiWallTimer();
2106         processBufferedCount++;
2107 #endif
2108         processBufferedBcast();
2109
2110         processBufferedRdmaAcks();
2111         processBufferedRdmaRequests();
2112 #if CMK_IBVERBS_STATS
2113         processBufferedTime += (CmiWallTimer()-_startTime);
2114 #endif  
2115 };
2116
2117
2118 /*************************
2119         Increase tokens when short of them
2120 **********/
2121 static inline void increaseTokens(OtherNode node){
2122         int err;
2123         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2124         if(node->infiData->totalTokens + increase > maxTokens){
2125                 increase = maxTokens-node->infiData->totalTokens;
2126         }
2127         node->infiData->totalTokens += increase;
2128         node->infiData->tokensLeft += increase;
2129         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2130         //increase the size of the sendCq
2131         int currentCqSize = context->sendCqSize;
2132         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2133                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2134                 CmiAssert(0);
2135         }
2136         context->sendCqSize+= increase;
2137 };
2138
2139 static void increasePostedRecvs(int nodeNo){
2140         OtherNode node = &nodes[nodeNo];
2141         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2142         int recvIncrease = tokenIncrease;
2143         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2144                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2145         }
2146         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2147                 recvIncrease = maxRecvBuffers-context->srqSize;
2148         }
2149         node->infiData->postedRecvs+= recvIncrease;
2150         context->srqSize += recvIncrease;
2151         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2152         //increase the size of the recvCq
2153         int currentCqSize = context->recvCqSize;
2154         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2155                 CmiAssert(0);
2156         }
2157         context->recvCqSize += tokenIncrease;
2158         if(recvIncrease > 0){
2159                 //create another bufferPool and attach it to the top of the current one
2160                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2161                 newPool->next = context->recvBufferPool;
2162                 context->recvBufferPool = newPool;
2163                 postInitialRecvs(newPool,recvIncrease,packetSize);
2164         }
2165
2166 };
2167
2168
2169
2170
2171 /*********************************************
2172         Memory management routines for RDMA
2173
2174 ************************************************/
2175
2176 /**
2177         There are INFINUMPOOLS of memory.
2178         The first pool is of size firstBinSize.
2179         The ith pool is of size firstBinSize*2^i
2180 */
2181
2182 static void initInfiCmiChunkPools(){
2183         int i,j;
2184         int size = firstBinSize;
2185         int nodeSize;
2186
2187 #if THREAD_MULTI_POOL
2188         nodeSize = CmiMyNodeSize() + 1;
2189         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2190         for(i = 0; i < nodeSize; i++){
2191                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2192         }
2193         for(j = 0; j < nodeSize; j++){
2194                 size = firstBinSize;
2195                 for(i=0;i<INFINUMPOOLS;i++){
2196                         infiCmiChunkPools[j][i].size = size;
2197                         infiCmiChunkPools[j][i].startBuf = NULL;
2198                         infiCmiChunkPools[j][i].count = 0;
2199                         size *= 2;
2200                 }
2201         }
2202
2203         // creating the n^2 system of queues
2204         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2205         for(i = 0; i < nodeSize; i++){
2206                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2207         }
2208         for(i = 0; i < nodeSize; i++)
2209                 for(j = 0; j < nodeSize; j++)
2210                         queuePool[i][j] = PCQueueCreate();
2211
2212 #else
2213
2214         size = firstBinSize;    
2215         for(i=0;i<INFINUMPOOLS;i++){
2216                 infiCmiChunkPools[i].size = size;
2217                 infiCmiChunkPools[i].startBuf = NULL;
2218                 infiCmiChunkPools[i].count = 0;
2219                 size *= 2;
2220         }
2221 #endif
2222
2223 }
2224
2225 /***********
2226 Register memory for a part of a received multisend message
2227 *************/
2228 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2229         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2230         char *res=msg-sizeof(infiCmiChunkHeader);
2231         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2232 #if CMK_IBVERBS_STATS
2233         numCurReg++;
2234         numReg++;
2235         numMultiSend++;
2236 #endif
2237         CmiAssert(metaData->key!=NULL);
2238         metaData->owner = NULL;
2239         metaData->poolIdx = INFIMULTIPOOL;
2240
2241         return metaData;
2242 };
2243
2244
2245 #if THREAD_MULTI_POOL
2246
2247 // Fills up the buffer pools for every thread in the node
2248 static inline void fillBufferPools(){
2249         int nodeSize, poolIdx, thread;
2250         infiCmiChunkMetaData *metaData;         
2251         infiCmiChunkHeader *hdr;
2252         int allocSize;
2253         int count=1;
2254         int i;
2255         struct ibv_mr *key;
2256         void *res;
2257
2258         // initializing values
2259         nodeSize = CmiMyNodeSize() + 1;
2260
2261         // iterating over all threads and all pools
2262         for(thread = 0; thread < nodeSize; thread++){
2263                 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2264                         allocSize = infiCmiChunkPools[thread][poolIdx].size;
2265                         if(poolIdx < blockThreshold){
2266                                 count = blockAllocRatio;
2267                         }else{
2268                                 count = 1;
2269                         }
2270                         res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2271                         hdr = res;
2272                         key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2273                         CmiAssert(key != NULL);
2274 #if CMK_IBVERBS_STATS
2275                 numCurReg++;
2276                 numReg++;
2277 #endif
2278                         res += sizeof(infiCmiChunkHeader);
2279                         for(i=0;i<count;i++){
2280                                 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2281                                 metaData->key = key;
2282                                 metaData->owner = hdr;
2283                                 metaData->poolIdx = poolIdx;
2284                                 metaData->parentPe = thread;                                            // setting the parent PE
2285                                 if(i == 0){
2286                                         metaData->owner->metaData->count = count;
2287                                         metaData->nextBuf = NULL;
2288                                 }else{
2289                                         void *startBuf = res - sizeof(infiCmiChunkHeader);
2290                                         metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2291                                         infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2292                                         infiCmiChunkPools[thread][poolIdx].count++;
2293                                 }
2294                                 if(i != count-1){
2295                                         res += (allocSize+sizeof(infiCmiChunkHeader));
2296                                 }
2297                         }
2298                 }
2299         }       
2300 }
2301
2302 static inline void *getInfiCmiChunkThread(int dataSize){
2303         //find out to which pool this dataSize belongs to
2304         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2305         int ratio = dataSize/firstBinSize;
2306         int poolIdx=0;
2307         void *res;
2308         int i,j,nodeSize;
2309         void *pointer;
2310
2311         //printf("Hi\n");
2312         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2313         MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2314         
2315         while(ratio > 0){
2316                 ratio  = ratio >> 1;
2317                 poolIdx++;
2318         }
2319         MACHSTATE1(2,"This is %d",CmiMyRank());
2320         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2321
2322         // checking whether to analyze the free queues to reuse buffers 
2323         nodeSize = CmiMyNodeSize() + 1;
2324         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2325                 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2326                 for(i = 0; i < nodeSize; i++){
2327                         if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2328                                 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2329                                         pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2330                                         infi_CmiFreeDirect(pointer);    
2331                                 }
2332                         }
2333                 }       
2334         }
2335
2336         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2337                 infiCmiChunkMetaData *metaData;         
2338                 infiCmiChunkHeader *hdr;
2339                 int allocSize;
2340                 int count=1;
2341                 int i;
2342                 struct ibv_mr *key;
2343                 void *origres;
2344                 
2345                 
2346                 if(poolIdx < INFINUMPOOLS ){
2347                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2348                 }else{
2349                         allocSize = dataSize;
2350                 }
2351
2352                 if(poolIdx < blockThreshold){
2353                         count = blockAllocRatio;
2354                 }
2355                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2356                 _MEMCHECK(res);
2357                 hdr = res;
2358                 
2359                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2360                 CmiAssert(key != NULL);
2361 #if CMK_IBVERBS_STATS
2362                 numCurReg++;
2363                 numReg++;
2364 #endif
2365                 
2366                 origres = (res += sizeof(infiCmiChunkHeader));
2367
2368                 for(i=0;i<count;i++){
2369                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2370                         _MEMCHECK(metaData);
2371                         metaData->key = key;
2372                         metaData->owner = hdr;
2373                         metaData->poolIdx = poolIdx;
2374                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2375
2376                         if(i == 0){
2377                                 metaData->owner->metaData->count = count;
2378                                 metaData->nextBuf = NULL;
2379                         }else{
2380                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2381                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2382                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2383                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2384                                 
2385                         }
2386                         if(i != count-1){
2387                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2388                         }
2389           }     
2390                 
2391                 
2392                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2393                 
2394                 return origres;
2395         }
2396         if(poolIdx < INFINUMPOOLS){
2397                 infiCmiChunkMetaData *metaData;                         
2398         
2399                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2400                 res += sizeof(infiCmiChunkHeader);
2401
2402                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2403                 metaData = METADATAFIELD(res);
2404
2405                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2406                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2407                 
2408                 metaData->nextBuf = NULL;
2409 //              CmiAssert(metaData->poolIdx == poolIdx);
2410
2411                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2412                 return res;
2413         }
2414
2415         CmiAssert(0);
2416
2417         
2418 };
2419 #else /* not MULTIPOOL case */
2420 static inline void *getInfiCmiChunk(int dataSize){
2421         //find out to which pool this dataSize belongs to
2422         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2423         int ratio = dataSize/firstBinSize;
2424         int poolIdx=0;
2425         char *res;
2426 #if CMK_IBVERBS_STATS
2427         if(numAlloc>10000 && numAlloc%1000==0)
2428           {
2429           printf("[%d] numReg %d numUnReg %d numCurReg %d numAlloc %d numFree %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, numAlloc, numFree, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
2430           /*      printf("[%d]  numMultiSendUnreg %d numMultiSend %d  numMultiSendFree %d\n",_Cmi_mynode, numMultiSendUnreg, numMultiSend, numMultiSendFree);*/
2431           }
2432 #endif
2433         while(ratio > 0){
2434                 ratio  = ratio >> 1;
2435                 poolIdx++;
2436         }
2437         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2438         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2439                 infiCmiChunkMetaData *metaData;
2440                 infiCmiChunkHeader *hdr;
2441                 int allocSize;
2442                 int count=1;
2443                 int i;
2444                 struct ibv_mr *key;
2445                 void *origres;
2446
2447
2448                 if(poolIdx < INFINUMPOOLS ){
2449                         allocSize = infiCmiChunkPools[poolIdx].size;
2450                 }else{
2451                         allocSize = dataSize;
2452                 }
2453
2454                 if(poolIdx < blockThreshold){
2455                         count = blockAllocRatio;
2456                 }
2457                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2458                 hdr = (infiCmiChunkHeader *)res;
2459
2460                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2461                 CmiAssert(key != NULL);
2462 #if CMK_IBVERBS_STATS
2463                 numCurReg++;
2464                 numReg++;
2465 #endif
2466                 origres = (res += sizeof(infiCmiChunkHeader));
2467
2468                 for(i=0;i<count;i++){
2469                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2470                         metaData->key = key;
2471                         metaData->owner = hdr;
2472                         metaData->poolIdx = poolIdx;
2473
2474                         if(i == 0){
2475                                 metaData->owner->metaData->count = count;
2476                                 metaData->nextBuf = NULL;
2477                         }else{
2478                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2479                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2480                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2481                                 infiCmiChunkPools[poolIdx].count++;
2482
2483                         }
2484                         if(i != count-1){
2485                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2486                         }
2487           }
2488
2489
2490                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2491
2492                 return origres;
2493         }
2494         if(poolIdx < INFINUMPOOLS){
2495                 infiCmiChunkMetaData *metaData;
2496
2497                 res = infiCmiChunkPools[poolIdx].startBuf;
2498                 res += sizeof(infiCmiChunkHeader);
2499
2500                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2501                 metaData = METADATAFIELD(res);
2502
2503                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2504                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2505
2506                 metaData->nextBuf = NULL;
2507 //              CmiAssert(metaData->poolIdx == poolIdx);
2508
2509                 infiCmiChunkPools[poolIdx].count--;
2510                 return res;
2511         }
2512
2513         CmiAssert(0);
2514
2515
2516 };
2517 #endif
2518
2519
2520 void * infi_CmiAlloc(int size){
2521         char *res;
2522 #if CMK_IBVERBS_STATS
2523         numAlloc++;
2524 #endif
2525         if (Cmi_charmrun_fd == -1) return malloc(size);
2526 #if THREAD_MULTI_POOL
2527         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2528         res -= sizeof(CmiChunkHeader);
2529
2530         return res;
2531 #else
2532 #if CMK_SMP
2533         CmiMemLock();
2534 #endif
2535 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2536                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2537
2538                 res = (char*)getInfiCmiChunk(size-sizeof(CmiChunkHeader));      
2539                 res -= sizeof(CmiChunkHeader);
2540 #if CMK_SMP     
2541         CmiMemUnlock();
2542 #endif
2543 /*      }else{
2544                 res = malloc(size);
2545         }*/
2546         
2547         return res;
2548 #endif
2549 }
2550
2551 #if THREAD_MULTI_POOL
2552 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2553 void infi_CmiFreeDirect(void *ptr){
2554         int size;
2555         int parentPe;
2556         void *freePtr = ptr;
2557 #if CMK_IBVERBS_STATS
2558         numFree++;
2559 #endif
2560
2561         //ptr += sizeof(CmiChunkHeader);
2562         size = SIZEFIELD (ptr);
2563 /*      if(size > firstBinSize){*/
2564         infiCmiChunkMetaData *metaData;
2565         int poolIdx;
2566         //there is a infiniband specific header
2567         freePtr = ptr - sizeof(infiCmiChunkHeader);
2568         metaData = METADATAFIELD(ptr);
2569         poolIdx = metaData->poolIdx;
2570         infiCmiChunkPool *pool = infiCmiChunkPools[CmiMyRank()] + poolIdx;
2571         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2572 //      CmiAssert(poolIdx >= 0);
2573         if(poolIdx < INFINUMPOOLS && pool->count < INFIMAXPERPOOL &&
2574            pool->count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2575           metaData->nextBuf = pool->startBuf;
2576           pool->startBuf = freePtr;
2577           pool->count++;
2578           MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,pool->startBuf,pool->count);
2579         }else{
2580           MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2581           metaData->owner->metaData->count--;
2582           if(metaData->owner->metaData == metaData){
2583             //I am the owner
2584             if(metaData->owner->metaData->count == 0){
2585               //all the chunks have been freed
2586               int unregstat=ibv_dereg_mr(metaData->key);
2587 #if CMK_IBVERBS_STATS
2588               numUnReg++;
2589               numCurReg--;
2590 #endif
2591
2592               CmiAssert(unregstat==0);
2593               free(freePtr);
2594               free(metaData);
2595             }
2596             //if I am the owner and all the chunks have not been
2597             // freed dont free my metaData. will need later
2598           }else{
2599             if(metaData->owner->metaData->count == 0){
2600               //need to free the owner's buffer and metadata
2601               freePtr = metaData->owner;
2602               int unregstat=ibv_dereg_mr(metaData->key);
2603 #if CMK_IBVERBS_STATS
2604               numUnReg++;
2605               numCurReg--;
2606 #endif
2607
2608               CmiAssert(unregstat==0);
2609               free(metaData->owner->metaData);
2610               free(freePtr);
2611             }
2612             free(metaData);
2613           }
2614         }
2615 }
2616
2617
2618 void infi_CmiFree(void *ptr){
2619
2620         int i,j;
2621         int size;
2622         int parentPe;
2623         int nodeSize;
2624         void *pointer;
2625         void *freePtr = ptr;
2626         nodeSize = CmiMyNodeSize() + 1;
2627
2628         MACHSTATE(3,"Freeing");
2629
2630         ptr += sizeof(CmiChunkHeader);
2631         size = SIZEFIELD (ptr);
2632 /*      if(size > firstBinSize){*/
2633         infiCmiChunkMetaData *metaData;
2634         int poolIdx;
2635         //there is a infiniband specific header
2636         freePtr = ptr - sizeof(infiCmiChunkHeader);
2637         metaData = METADATAFIELD(ptr);
2638         poolIdx = metaData->poolIdx;
2639
2640         if(poolIdx == INFIMULTIPOOL){
2641                 /** this is a part of a received mult message  
2642                     it will be freed correctly later
2643                 **/
2644 #if CMK_IBVERBS_STATS
2645           numMultiSendFree++;
2646 #endif
2647           return;
2648         }
2649
2650
2651         // checking if this free operation is my responsibility
2652         parentPe = metaData->parentPe;
2653         if(parentPe != CmiMyRank()){
2654                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2655                 return;
2656         }
2657
2658
2659         infi_CmiFreeDirect(ptr);
2660
2661 }
2662
2663 #else
2664 void infi_CmiFree(void *ptr){
2665         int size;
2666         void *freePtr = ptr;
2667 #if CMK_IBVERBS_STATS
2668         numFree++;
2669 #endif
2670         
2671         if (Cmi_charmrun_fd == -1) return free(ptr);
2672 #if CMK_SMP     
2673         CmiMemLock();
2674 #endif
2675         ptr += sizeof(CmiChunkHeader);
2676         size = SIZEFIELD (ptr);
2677 /*      if(size > firstBinSize){*/
2678                 infiCmiChunkMetaData *metaData;
2679                 int poolIdx;
2680                 //there is a infiniband specific header
2681                 freePtr = (char*)ptr - sizeof(infiCmiChunkHeader);
2682                 metaData = METADATAFIELD(ptr);
2683                 poolIdx = metaData->poolIdx;
2684                 if(poolIdx == INFIMULTIPOOL){
2685                         /** this is a part of a received mult message  
2686                         it will be freed correctly later
2687                         **/
2688 #if CMK_IBVERBS_STATS
2689                         numMultiSendFree++;
2690 #endif
2691                         return;
2692                 }
2693                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2694 //              CmiAssert(poolIdx >= 0);
2695                 if(poolIdx < INFINUMPOOLS &&
2696                    infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL &&
2697                    infiCmiChunkPools[poolIdx].count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2698                   metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2699                   infiCmiChunkPools[poolIdx].startBuf = freePtr;
2700                   infiCmiChunkPools[poolIdx].count++;
2701                         
2702                   MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2703                 }else{
2704                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2705                         metaData->owner->metaData->count--;
2706                         if(metaData->owner->metaData == metaData){
2707                                 //I am the owner
2708                                 if(metaData->owner->metaData->count == 0){
2709                                         //all the chunks have been freed
2710                                         int unregstat=ibv_dereg_mr(metaData->key);
2711 #if CMK_IBVERBS_STATS
2712                                         numUnReg++;
2713                                         numCurReg--;
2714 #endif
2715
2716                                         CmiAssert(unregstat==0);
2717                                         free(freePtr);
2718                                         free(metaData);
2719                                 }
2720                                 //if I am the owner and all the chunks have not been
2721                                 // freed dont free my metaData. will need later
2722                         }else{
2723                                 if(metaData->owner->metaData->count == 0){
2724                                         //need to free the owner's buffer and metadata
2725                                         freePtr = metaData->owner;
2726                                         int unregstat=ibv_dereg_mr(metaData->key);
2727 #if CMK_IBVERBS_STATS
2728                                         numUnReg++;
2729                                         numCurReg--;
2730 #endif
2731
2732                                         CmiAssert(unregstat==0);
2733                                         free(metaData->owner->metaData);
2734                                         free(freePtr);
2735                                 }
2736                                 free(metaData);
2737                         }
2738                 }       
2739 #if CMK_SMP     
2740         CmiMemUnlock();
2741 #endif
2742 /*      }else{
2743                 free(freePtr);
2744         }*/
2745 }
2746 #endif
2747
2748 /*********************************************************************************************
2749 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2750 the user can transfer data between processors without using Charm++ messages. This lets the user
2751 send and receive data from the middle of his arrays without any copying on either send or receive
2752 side
2753 *********************************************************************************************/
2754
2755 struct infiDirectRequestPacket{
2756         int senderProc;
2757         int handle;
2758         struct ibv_mr senderKey;
2759         void *senderBuf;
2760         int senderBufSize;
2761 };
2762
2763 #include "cmidirect.h"
2764
2765 #define MAXHANDLES 512
2766
2767 struct infiDirectHandleStruct;
2768
2769
2770 typedef struct directPollingQNodeStruct {
2771         struct infiDirectHandleStruct *handle;
2772         struct directPollingQNodeStruct *next;
2773         double *lastDouble;
2774 } directPollingQNode;
2775
2776 typedef struct infiDirectHandleStruct{
2777         int id;
2778         void *buf;
2779         int size;
2780         struct ibv_mr *key;
2781         void (*callbackFnPtr)(void *);
2782         void *callbackData;
2783 //      struct infiDirectRequestPacket *packet;
2784         struct infiDirectUserHandle userHandle;
2785         struct infiRdmaPacket *rdmaPacket;
2786         directPollingQNode pollingQNode;
2787 }       infiDirectHandle;
2788
2789 typedef struct infiDirectHandleTableStruct{
2790         infiDirectHandle handles[MAXHANDLES];
2791         struct infiDirectHandleTableStruct *next;
2792 } infiDirectHandleTable;
2793
2794
2795 // data structures 
2796
2797 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2798
2799 static infiDirectHandleTable **sendHandleTable=NULL;
2800 static infiDirectHandleTable **recvHandleTable=NULL;
2801
2802 static int *recvHandleCount=NULL;
2803
2804 void addHandleToPollingQ(infiDirectHandle *handle){
2805 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2806         directPollingQNode *newNode = &(handle->pollingQNode);
2807         newNode->handle = handle;
2808         newNode->next = NULL;
2809         if(headDirectPollingQ==NULL){
2810                 /*empty pollingQ*/
2811                 headDirectPollingQ = newNode;
2812                 tailDirectPollingQ = newNode;
2813         }else{
2814                 tailDirectPollingQ->next = newNode;
2815                 tailDirectPollingQ = newNode;
2816         }
2817 };
2818 /*
2819 infiDirectHandle *removeHandleFromPollingQ(){
2820         if(headDirectPollingQ == NULL){
2821                 //polling Q is empty
2822                 return NULL;
2823         }
2824         directPollingQNode *retNode = headDirectPollingQ;
2825         if(headDirectPollingQ == tailDirectPollingQ){
2826                 //PollingQ has one node 
2827                 headDirectPollingQ = tailDirectPollingQ = NULL;
2828         }else{
2829                 headDirectPollingQ = headDirectPollingQ->next;
2830         }
2831         infiDirectHandle *retHandle = retNode->handle;
2832         free(retNode);
2833         return retHandle;
2834 }*/
2835
2836 static inline infiDirectHandleTable **createHandleTable(){
2837         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2838         int i;
2839         for(i=0;i<_Cmi_numnodes;i++){
2840                 table[i] = NULL;
2841         }
2842         return table;
2843 }
2844
2845 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2846         *tableIdx = handle/MAXHANDLES;
2847         *idx = handle%MAXHANDLES;
2848 };
2849
2850 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2851 {
2852         /** initialize the last double in the buffer to bufize***/
2853         int index = recvBufSize - sizeof(double);
2854         double *lastDouble = (double *)(((char *)recvBuf)+index);
2855         *lastDouble = initialValue;
2856 }
2857
2858
2859 /**
2860  To be called on the receiver to create a handle and return its number
2861 **/
2862 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2863         int newHandle;
2864         int tableIdx,idx;
2865         int i;
2866         infiDirectHandleTable *table;
2867         struct infiDirectUserHandle userHandle;
2868         
2869         CmiAssert(recvBufSize > sizeof(double));
2870         
2871         if(recvHandleTable == NULL){
2872                 recvHandleTable = createHandleTable();
2873                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2874                 for(i=0;i<_Cmi_numnodes;i++){
2875                         recvHandleCount[i] = -1;
2876                 }
2877         }
2878         if(recvHandleTable[senderNode] == NULL){
2879                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2880                 recvHandleTable[senderNode]->next = NULL;               
2881         }
2882         
2883         newHandle = ++recvHandleCount[senderNode];
2884         CmiAssert(newHandle >= 0);
2885         
2886         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2887         
2888         table = recvHandleTable[senderNode];
2889         for(i=0;i<tableIdx;i++){
2890                 if(table->next ==NULL){
2891                         table->next = malloc(sizeof(infiDirectHandleTable));
2892                         table->next->next = NULL;
2893                 }
2894                 table = table->next;
2895         }
2896         table->handles[idx].id = newHandle;
2897         table->handles[idx].buf = recvBuf;
2898         table->handles[idx].size = recvBufSize;
2899 #if CMI_DIRECT_DEBUG
2900         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2901 #endif
2902         table->handles[idx].callbackFnPtr = callbackFnPtr;
2903         table->handles[idx].callbackData = callbackData;
2904         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2905         CmiAssert(table->handles[idx].key != NULL);
2906 #if CMK_IBVERBS_STATS
2907         numCurReg++;
2908         numReg++;
2909 #endif
2910 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2911         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2912         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2913         
2914         userHandle.handle = newHandle;
2915         userHandle.recverNode = _Cmi_mynode;
2916         userHandle.senderNode = senderNode;
2917         userHandle.recverBuf = recvBuf;
2918         userHandle.recverBufSize = recvBufSize;
2919         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2920         userHandle.initialValue = initialValue;
2921         
2922         table->handles[idx].userHandle = userHandle;
2923         
2924         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2925
2926   {
2927          int index = table->handles[idx].size - sizeof(double);
2928    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2929         } 
2930         
2931         addHandleToPollingQ(&(table->handles[idx]));
2932         
2933 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2934         
2935         return userHandle;
2936 }
2937
2938 /****
2939  To be called on the sender to attach the sender's buffer to this handle
2940 ******/
2941 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2942         int tableIdx,idx;
2943         int i;
2944         int handle = userHandle->handle;
2945         int recverNode  = userHandle->recverNode;
2946
2947         infiDirectHandleTable *table;
2948
2949         if(sendHandleTable == NULL){
2950                 sendHandleTable = createHandleTable();
2951         }
2952         if(sendHandleTable[recverNode] == NULL){
2953                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2954                 sendHandleTable[recverNode]->next = NULL;
2955         }
2956         
2957         CmiAssert(handle >= 0);
2958         calcHandleTableIdx(handle,&tableIdx,&idx);
2959         
2960         table = sendHandleTable[recverNode];
2961         for(i=0;i<tableIdx;i++){
2962                 if(table->next ==NULL){
2963                         table->next = malloc(sizeof(infiDirectHandleTable));
2964                         table->next->next = NULL;
2965                 }
2966                 table = table->next;
2967         }
2968
2969         table->handles[idx].id = handle;
2970         table->handles[idx].buf = sendBuf;
2971
2972         table->handles[idx].size = sendBufSize;
2973 #if CMI_DIRECT_DEBUG
2974         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2975 #endif
2976         table->handles[idx].callbackFnPtr = NULL;
2977         table->handles[idx].callbackData = NULL;
2978         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2979         CmiAssert(table->handles[idx].key != NULL);
2980 #if CMK_IBVERBS_STATS
2981         numCurReg++;
2982         numReg++;
2983 #endif
2984         table->handles[idx].userHandle = *userHandle;
2985         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2986         
2987         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2988         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2989         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2990         
2991         
2992 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2993         table->handles[idx].packet->senderProc = _Cmi_mynode;
2994         table->handles[idx].packet->handle = handle;
2995         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2996         table->handles[idx].packet->senderBuf = sendBuf;
2997         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2998         
2999         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
3000 };
3001
3002
3003
3004
3005
3006 /****
3007 To be called on the sender to do the actual data transfer
3008 ******/
3009 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
3010         int handle = userHandle->handle;
3011         int recverNode = userHandle->recverNode;
3012         if(recverNode == _Cmi_mynode){
3013                 /*when the sender and receiver are on the same
3014                 processor, just look up the sender and receiver
3015                 buffers and do a memcpy*/
3016
3017                 infiDirectHandleTable *senderTable;
3018                 infiDirectHandleTable *recverTable;
3019                 
3020                 int tableIdx,idx,i;
3021
3022                 
3023                 /*find entry for this handle in sender table*/
3024                 calcHandleTableIdx(handle,&tableIdx,&idx);
3025                 CmiAssert(sendHandleTable!= NULL);
3026                 senderTable = sendHandleTable[_Cmi_mynode];
3027                 CmiAssert(senderTable != NULL);
3028                 for(i=0;i<tableIdx;i++){
3029                         senderTable = senderTable->next;
3030                 }
3031
3032                 /**find entry for this handle in recver table*/
3033                 recverTable = recvHandleTable[recverNode];
3034                 CmiAssert(recverTable != NULL);
3035                 for(i=0;i< tableIdx;i++){
3036                         recverTable = recverTable->next;
3037                 }
3038                 
3039                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
3040                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
3041 #if CMI_DIRECT_DEBUG
3042                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
3043 #endif
3044                 // The polling Q should find you and handle the callback and pollingq entry
3045                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
3046                 
3047
3048         }else{
3049                 infiPacket packet;
3050                 int tableIdx,idx;
3051                 int i;
3052                 OtherNode node;
3053                 infiDirectHandleTable *table;
3054
3055                 calcHandleTableIdx(handle,&tableIdx,&idx);
3056
3057                 table = sendHandleTable[recverNode];
3058                 CmiAssert(table != NULL);
3059                 for(i=0;i<tableIdx;i++){
3060                         table = table->next;
3061                 }
3062
3063 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
3064 #if CMI_DIRECT_DEBUG
3065                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
3066 #endif
3067
3068                 
3069                 {
3070                         
3071                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
3072                         struct ibv_sge list = {
3073                                 .addr = (uintptr_t )table->handles[idx].buf,
3074                                 .length = table->handles[idx].size,
3075                                 .lkey   = table->handles[idx].key->lkey
3076                         };
3077                         
3078                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
3079
3080                         struct ibv_send_wr *bad_wr;
3081                         struct ibv_send_wr wr = {
3082                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3083                                 .sg_list = &list,
3084                                 .num_sge = 1,
3085                                 .opcode = IBV_WR_RDMA_WRITE,
3086                                 .send_flags = IBV_SEND_SIGNALED,
3087                                 
3088                                 .wr.rdma = {
3089                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
3090                                         .rkey = remoteKey->rkey
3091                                 }
3092                         };
3093                         /** post and rdma_read that is a rdma get*/
3094                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3095                                 CmiAssert(0);
3096                         }
3097                 }
3098
3099         /*      MallocInfiPacket (packet);
3100                 {
3101                         packet->size = sizeof(struct infiDirectRequestPacket);
3102                         packet->buf = (char *)(table->handles[idx].packet);
3103                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3104                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3105                 }*/
3106         }
3107
3108 };
3109
3110 /**** need not be called the first time *********/
3111 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3112   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3113 }
3114
3115 /**** need not be called the first time *********/
3116 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3117         int handle = userHandle->handle;
3118         int tableIdx,idx,i;
3119         infiDirectHandleTable *table;
3120         calcHandleTableIdx(handle,&tableIdx,&idx);
3121         
3122         table = recvHandleTable[userHandle->senderNode];
3123         CmiAssert(table != NULL);
3124         for(i=0;i<tableIdx;i++){
3125                 table = table->next;
3126         }
3127 #if CMI_DIRECT_DEBUG
3128   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3129 #endif  
3130         addHandleToPollingQ(&(table->handles[idx]));
3131         
3132
3133 }
3134
3135 /**** need not be called the first time *********/
3136 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3137         int handle = userHandle->handle;
3138         int tableIdx,idx,i;
3139         infiDirectHandleTable *table;
3140         
3141         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3142
3143         calcHandleTableIdx(handle,&tableIdx,&idx);
3144         
3145         table = recvHandleTable[userHandle->senderNode];
3146         CmiAssert(table != NULL);
3147         for(i=0;i<tableIdx;i++){
3148                 table = table->next;
3149         }
3150 #if CMI_DIRECT_DEBUG
3151   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3152 #endif  
3153         addHandleToPollingQ(&(table->handles[idx]));
3154         
3155 }
3156
3157
3158 static int receivedDirectMessage(infiDirectHandle *handle){
3159 //      int index = handle->size - sizeof(double);
3160 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
3161         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3162                 return 0;
3163         }else{
3164                 (*(handle->callbackFnPtr))(handle->callbackData);       
3165                 return 1;
3166         }
3167         
3168 }
3169
3170
3171 static void pollCmiDirectQ(){
3172         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3173         while(ptr != NULL){
3174                 if(receivedDirectMessage(ptr->handle)){
3175 #if CMI_DIRECT_DEBUG
3176       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
3177 #endif
3178                         directPollingQNode *delPtr = ptr;
3179                         /** has been received and delete this node***/
3180                         if(prevPtr == NULL){
3181                                 /** first in the pollingQ**/
3182                                 if(headDirectPollingQ == tailDirectPollingQ){
3183                                         /**only node in pollingQ****/
3184                                         headDirectPollingQ = tailDirectPollingQ = NULL;
3185                                 }else{
3186                                         headDirectPollingQ = headDirectPollingQ->next;
3187                                 }
3188                         }else{
3189                                 if(ptr == tailDirectPollingQ){
3190                                         /**last node is being deleted**/
3191                                         tailDirectPollingQ = prevPtr;
3192                                 }
3193                                 prevPtr->next = ptr->next;
3194                         }
3195                         ptr = ptr->next;
3196                 //      free(delPtr);
3197                 }else{
3198                         prevPtr = ptr;
3199                         ptr = ptr->next;
3200                 }
3201         }
3202 }
3203
3204
3205 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3206         int senderProc = directRequestPacket->senderProc;
3207         int handle = directRequestPacket->handle;
3208         int tableIdx,idx,i;
3209         infiDirectHandleTable *table;
3210         OtherNode node = nodes_by_pe[senderProc];
3211
3212         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3213
3214         calcHandleTableIdx(handle,&tableIdx,&idx);
3215
3216         table = recvHandleTable[senderProc];
3217         CmiAssert(table != NULL);
3218         for(i=0;i<tableIdx;i++){
3219                 table = table->next;
3220         }
3221         
3222         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3223         
3224         {
3225                 struct ibv_sge list = {
3226                         .addr = (uintptr_t )table->handles[idx].buf,
3227                         .length = table->handles[idx].size,
3228                         .lkey   = table->handles[idx].key->lkey
3229                 };
3230
3231                 struct ibv_send_wr *bad_wr;
3232                 struct ibv_send_wr wr = {
3233                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3234                         .sg_list = &list,
3235                         .num_sge = 1,
3236                         .opcode = IBV_WR_RDMA_READ,
3237                         .send_flags = IBV_SEND_SIGNALED,
3238                         .wr.rdma = {
3239                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3240                                 .rkey = directRequestPacket->senderKey.rkey
3241                         }
3242                 };
3243 //       post and rdma_read that is a rdma get
3244                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3245                         CmiAssert(0);
3246                 }
3247         }
3248                         
3249         
3250 };*/
3251 /*
3252 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3253         MACHSTATE(3,"processDirectWC");
3254         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3255         (*(handle->callbackFnPtr))(handle->callbackData);
3256 };
3257 */
3258
3259 #if 0
3260
3261 // use the common one
3262
3263 static void sendBarrierMessage(int pe)
3264 {
3265   /* we will only need one packet */
3266   int size=32;
3267   OtherNode  node = nodes + pe;
3268   infiPacket packet;
3269   MallocInfiPacket(packet);
3270   packet->size = size;
3271   packet->buf = CmiAlloc(size);
3272   packet->header.code = INFIBARRIERPACKET;
3273   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3274   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3275   /*  pollSendCq(0);*/
3276   EnqueuePacket(node,packet,size,key);
3277 }
3278
3279 static void recvBarrierMessage()
3280 {
3281   int i;
3282   int ne;
3283   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3284   struct ibv_wc wc[1];
3285   struct ibv_wc *recvWC;
3286   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3287   int toBuffer=1; // buffer without processing recvd messages
3288   int barrierReached=0;
3289   struct infiBuffer *buffer = NULL;
3290   struct infiPacketHeader *header = NULL;
3291   int nodeNo=-1;
3292   int len=-1;
3293   while(!barrierReached)
3294     {
3295       /* gengbin's semantic will implode if more than one q is polled at a time */
3296       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3297       //        CmiAssert(ne >=0);
3298       if(ne != 0){
3299         MACHSTATE1(3,"recvBarrier ne %d",ne);
3300       }
3301       pollSendCq(1); 
3302       for(i=0;i<ne;i++){
3303         if(wc[i].status != IBV_WC_SUCCESS){
3304           CmiAssert(0);
3305         }
3306         switch(wc[i].opcode){
3307         case IBV_WC_RECV: /* we have something to consider*/
3308           recvWC=&wc[i];
3309           buffer = (struct infiBuffer *) recvWC->wr_id; 
3310           header = (struct infiPacketHeader *)buffer->buf;
3311           nodeNo = header->nodeNo;
3312           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3313
3314           if(header->code & INFIPACKETCODE_DATA){
3315             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3316           }
3317           if(header->code & INFIDUMMYPACKET){
3318             MACHSTATE(3,"Dummy packet");
3319           }
3320           if(header->code & INFIBARRIERPACKET){
3321             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3322             // now we are done
3323             barrierReached=1;
3324             /* semantically questionable */
3325             //processAllBufferedMsgs();
3326             //return;
3327           }
3328           if(rdma && header->code & INFIRDMA_START){
3329             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3330             //          if(toBuffer){
3331             //TODO: make a function of this and use for both acks and requests
3332             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3333             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3334             *copyPacket = *rdmaPacket;
3335             copyPacket->fromNodeNo = nodeNo;
3336             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3337             context->bufferedRdmaRequests = copyPacket;
3338             copyPacket->next = tmp;
3339             copyPacket->prev = NULL;
3340             if(tmp != NULL){
3341               tmp->prev = copyPacket;
3342             }
3343             /*          }else{
3344                         processRdmaRequest(rdmaPacket,nodeNo,0);
3345                         }*/
3346           }
3347           if(rdma && header->code & INFIRDMA_ACK){
3348             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3349             processRdmaAck(rdmaPacket);
3350           }
3351           {
3352             struct ibv_sge list = {
3353               .addr     = (uintptr_t) buffer->buf,
3354               .length = buffer->size,
3355               .lkey     = buffer->key->lkey
3356             };
3357         
3358             struct ibv_recv_wr wr = {
3359               .wr_id = (uint64_t)buffer,
3360               .sg_list = &list,
3361               .num_sge = 1,
3362               .next = NULL
3363             };
3364             struct ibv_recv_wr *bad_wr;
3365         
3366             if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3367               CmiAssert(0);
3368             }
3369           }
3370
3371           break;
3372         default:
3373           CmiAbort("Wrong type of work completion object in recvq");
3374           break;
3375         }
3376       }
3377     }
3378   /* semantically questionable */
3379   //  processAllBufferedMsgs();
3380 }
3381
3382
3383 /* happen at node level */
3384 int CmiBarrier()
3385 {
3386   int len, size, i;
3387   int status;
3388   int count = 0;
3389   OtherNode node;
3390   int numnodes = CmiNumNodes();
3391   if (CmiMyRank() == 0) {
3392     /* every one send to pe 0 */
3393     if (CmiMyNode() != 0) {
3394       sendBarrierMessage(0);
3395     }
3396     /* printf("[%d] HERE\n", CmiMyPe()); */
3397     if (CmiMyNode() == 0) 
3398     {
3399       for (count = 1; count < numnodes; count ++) 
3400       {
3401         recvBarrierMessage();
3402       }
3403       /* pe 0 broadcast */
3404       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3405         int p = i;
3406         if (p > numnodes - 1) break;
3407         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3408         sendBarrierMessage(p);
3409       }
3410     }
3411     /* non 0 node waiting */
3412     if (CmiMyNode() != 0) 
3413     {
3414       recvBarrierMessage();
3415       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3416         int p = CmiMyNode();
3417         p = BROADCAST_SPANNING_FACTOR*p + i;
3418         if (p > numnodes - 1) break;
3419         p = p%numnodes;
3420         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3421         sendBarrierMessage(p);
3422       }
3423     }
3424   }
3425   CmiNodeAllBarrier();
3426   processAllBufferedMsgs();
3427   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3428 }
3429
3430 /* everyone sends a message to pe 0 and go on */
3431 int CmiBarrierZero()
3432 {
3433   int i;
3434
3435   if (CmiMyRank() == 0) {
3436     if (CmiMyNode()) {
3437       sendBarrierMessage(0);
3438     }
3439     else {
3440       for (i=0; i<CmiNumNodes()-1; i++)
3441       {
3442         recvBarrierMessage();
3443       }
3444     }
3445   }
3446   CmiNodeAllBarrier();
3447   processAllBufferedMsgs();
3448 }
3449
3450
3451 #endif