use CMK_IBV_PORT_ATTR_HAS_LINK_LAYER to protect use of link_layer field
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2  * Ibverbs (infiniband)  implementation of Converse NET version
3  * @ingroup NET
4  * contains only Ibverbs specific code for:
5  * - CmiMachineInit()
6  * - CmiCommunicationInit()
7  * - CmiNotifyStillIdle()
8  * - DeliverViaNetwork()
9  * - CommunicationServer()
10  * - CmiMachineExit()
11
12   created by 
13         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
14 */
15
16 /**
17  * @addtogroup NET
18  * @{
19  */
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #if CMK_HAS_MALLOC_H
27 #include <malloc.h>
28 #endif
29 #include <getopt.h>
30 #include <time.h>
31
32 #include <infiniband/verbs.h>
33
34 enum ibv_mtu mtu = IBV_MTU_2048;
35 static int page_size;
36 static int mtu_size;
37 static int packetSize;
38 static int dataSize;
39 static int rdma;
40 static int rdmaThreshold;
41 static int firstBinSize;
42 static int blockAllocRatio;
43 static int blockThreshold;
44
45
46 static int maxRecvBuffers;
47 static int maxTokens;
48 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
49 static int sendPacketPoolSize; /*total number of send buffers created*/
50
51 static double _startTime=0;
52 static int regCount;
53
54 static int pktCount;
55 static int msgCount;
56 static int minTokensLeft;
57
58
59 static double regTime;
60
61 static double processBufferedTime;
62 static int processBufferedCount;
63
64 #define CMK_IBVERBS_STATS 0
65 #define CMK_IBVERBS_TOKENS_FLOW 1
66 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
67 #define CMK_IBVERBS_DEBUG 0
68 #define CMI_DIRECT_DEBUG 0
69 #define WC_LIST_SIZE 32
70 /*#define WC_BUFFER_SIZE 100*/
71
72 #if CMK_IBVERBS_STATS
73 static int numReg=0;
74 static int numUnReg=0;
75 static int numCurReg=0;
76 static int numAlloc=0;
77 static int numFree=0;
78 static int numMultiSendUnreg=0;
79 static int numMultiSend=0;
80 static int numMultiSendFree=0;
81 #endif
82
83
84 #define INCTOKENS_FRACTION 0.04
85 #define INCTOKENS_INCREASE .50
86
87 // flag for using a pool for every thread in SMP mode
88 #if CMK_SMP
89 #define THREAD_MULTI_POOL 1
90 #endif
91
92 #if THREAD_MULTI_POOL 
93 #include "pcqueue.h"
94 PCQueue **queuePool;
95 void infi_CmiFreeDirect(void *ptr);
96 static inline void fillBufferPools();
97 #endif
98
99 #define INFIBARRIERPACKET 128
100
101 struct infiIncTokenAckPacket{
102         int a;
103 };
104
105 typedef struct {
106 char none;  
107 } CmiIdleState;
108
109
110 /********
111 ** The notify idle methods
112 ***/
113
114 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
115
116 static void CmiNotifyStillIdle(CmiIdleState *s);
117
118
119 static void CmiNotifyBeginIdle(CmiIdleState *s)
120 {
121   CmiNotifyStillIdle(s);
122 }
123
124 void CmiNotifyIdle(void) {
125   CmiNotifyStillIdle(NULL);
126 }
127
128 /***************
129 Data Structures 
130 ***********************/
131
132 /******
133         This is a header attached to the beginning of every infiniband packet
134 *******/
135 #define INFIPACKETCODE_DATA 1
136 #define INFIPACKETCODE_INCTOKENS 2
137 #define INFIRDMA_START 4
138 #define INFIRDMA_ACK 8
139 #define INFIDIRECT_REQUEST 16
140 #define INFIPACKETCODE_INCTOKENSACK 32
141 #define INFIDUMMYPACKET 64
142
143 struct infiPacketHeader{
144         char code;
145         int nodeNo;
146 #if     CMK_IBVERBS_DEBUG
147         int psn;
148 #endif  
149 };
150
151 /*
152         Types of rdma packets
153 */
154 #define INFI_MESG 1 
155 #define INFI_DIRECT 2
156
157 struct infiRdmaPacket{
158         int fromNodeNo;
159         int type;
160         struct ibv_mr key;
161         struct ibv_mr *keyPtr;
162         int remoteSize;
163         char *remoteBuf;
164         void *localBuffer;
165         OutgoingMsg ogm;
166         struct infiRdmaPacket *next,*prev;
167 };
168
169
170 /** Represents a buffer that is used to receive messages
171 */
172 #define BUFFER_RECV 1
173 #define BUFFER_RDMA 2
174 struct infiBuffer{
175         int type;
176         char *buf;
177         int size;
178         struct ibv_mr *key;
179 };
180
181
182
183 /** At the moment it is a simple pool with just a list of buffers
184         * TODO; extend it to make it an element in a linklist of pools
185 */
186 struct infiBufferPool{
187         int numBuffers;
188         struct infiBuffer *buffers;
189         struct infiBufferPool *next;
190 };
191
192 /*****
193         It is the structure for the send buffers that are used
194         to send messages to other nodes
195 ********/
196
197
198 typedef struct infiPacketStruct {       
199         char *buf;
200         int size;
201         struct infiPacketHeader header;
202         struct ibv_mr *keyHeader;
203         struct OtherNodeStruct *destNode;
204         struct infiPacketStruct *next;
205         OutgoingMsg ogm;
206         struct ibv_sge elemList[2];
207         struct ibv_send_wr wr;
208 }* infiPacket;
209
210 /*
211 typedef struct infiBufferedWCStruct{
212         struct ibv_wc wcList[WC_BUFFER_SIZE];
213         int count;
214         struct infiBufferedWCStruct *next,*prev;
215 } * infiBufferedWC;
216 */
217
218 #define BCASTLIST_SIZE 50
219
220 struct infiBufferedBcastStruct{
221         char *msg;
222         int size;
223         int broot;
224         int asm_rank;
225         int valid;
226 };
227
228 typedef struct infiBufferedBcastPoolStruct{
229         struct infiBufferedBcastPoolStruct *next,*prev;
230         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
231         int count;
232
233 } *infiBufferedBcastPool;
234
235
236
237
238 /***
239         This structure represents the data needed by the infiniband
240         communication routines of a node
241         TODO: add locking for the smp version
242 */
243 struct infiContext {
244         struct ibv_context      *context;
245         
246         fd_set  asyncFds;
247         struct timeval tmo;
248         
249         int ibPort;
250 //      struct ibv_comp_channel *channel;
251         struct ibv_pd           *pd;
252         struct ibv_cq           *sendCq;
253         struct ibv_cq   *recvCq;
254         struct ibv_srq  *srq;
255         
256         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
257                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
258                                                                                                 //when the qps are stored in the corresponding OtherNodes
259
260         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
261
262         infiPacket infiPacketFreeList; 
263         
264         struct infiBufferPool *recvBufferPool;
265
266         struct infiPacketHeader header;
267
268         int srqSize;
269         int sendCqSize,recvCqSize;
270         int tokensLeft;
271
272         infiBufferedBcastPool bufferedBcastList;
273         
274         struct infiRdmaPacket *bufferedRdmaAcks;
275         
276         struct infiRdmaPacket *bufferedRdmaRequests;
277 /*      infiBufferedWC infiBufferedRecvList;*/
278         
279         int insideProcessBufferedBcasts;
280 };
281
282 static struct infiContext *context = NULL;
283
284
285
286
287 /** Represents a qp used to send messages to another node
288  There is one for each remote node
289 */
290 struct infiAddr {
291         int lid,qpn,psn;
292 };
293
294 /**
295  Stored in the OtherNode structure in machine-dgram.c 
296  Store the per node data for ibverbs layer
297 */
298 enum { INFI_HEADER_DATA=21,INFI_DATA};
299
300 struct infiOtherNodeData{
301         struct ibv_qp *qp ;
302         int state;// does it expect a packet with a header (first packet) or one without
303         int totalTokens;
304         int tokensLeft;
305         int nodeNo;
306         
307         int postedRecvs;
308         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
309 #if     CMK_IBVERBS_DEBUG       
310         int psn;
311         int recvPsn;
312 #endif  
313 };
314
315
316 /********************************
317 Memory management structures and types
318 *****************/
319
320 struct infiCmiChunkHeaderStruct;
321
322 typedef struct infiCmiChunkMetaDataStruct {
323         struct ibv_mr *key;
324         int poolIdx;
325         void *nextBuf;
326         struct infiCmiChunkHeaderStruct *owner;
327         int count;
328
329 #if THREAD_MULTI_POOL
330         int parentPe;                                           // the PE that allocated the buffer and must release it
331 #endif
332 } infiCmiChunkMetaData;
333
334
335
336
337 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
338
339 typedef struct {
340         int size;//without infiCmiChunkHeader
341         void *startBuf;
342         int count;
343 } infiCmiChunkPool;
344
345 #define INFINUMPOOLS 20
346 #define INFIMAXPERPOOL 400
347 #define INFIMULTIPOOL 0xDEAFB00D
348
349 #if THREAD_MULTI_POOL
350 static infiCmiChunkPool **infiCmiChunkPools;
351 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
352 #else
353 static infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
354 #endif
355
356 static void initInfiCmiChunkPools();
357
358
359 static inline infiPacket newPacket(){
360         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
361         pkt->size = -1;
362         pkt->header = context->header;
363         pkt->next = NULL;
364         pkt->destNode = NULL;
365         pkt->keyHeader = METADATAFIELD(pkt)->key;
366         pkt->ogm=NULL;
367         CmiAssert(pkt->keyHeader!=NULL);
368         pkt->buf=NULL;
369         
370         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
371         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
372         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
373         
374         pkt->wr.wr_id = (uint64_t)pkt;
375         pkt->wr.sg_list = &(pkt->elemList[0]);
376         pkt->wr.num_sge = 2;
377         pkt->wr.opcode = IBV_WR_SEND;
378         pkt->wr.send_flags = IBV_SEND_SIGNALED;
379         pkt->wr.next = NULL;
380         
381         return pkt;
382 };
383
384 #define FreeInfiPacket(pkt){ \
385         pkt->size = -1;\
386         pkt->ogm=NULL;\
387         pkt->buf=NULL;\
388         pkt->next = context->infiPacketFreeList; \
389         context->infiPacketFreeList = pkt; \
390 }
391
392 #define MallocInfiPacket(pkt) { \
393         infiPacket p = context->infiPacketFreeList; \
394         if(p == NULL){ p = newPacket();} \
395                  else{context->infiPacketFreeList = p->next; } \
396         pkt = p;\
397 }
398
399
400
401 void infi_unregAndFreeMeta(void *md)
402 {
403   if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL))
404     {
405       int unregstat=ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
406       CmiAssert(unregstat==0);
407       free(((infiCmiChunkMetaData *)md));
408 #if CMK_IBVERBS_STATS
409       numUnReg++;
410       numCurReg--;
411       numMultiSendUnreg++;
412 #endif
413     }
414 }
415
416
417 /******************CmiMachineInit and its helper functions*/
418 static inline int pollSendCq(const int toBuffer);
419
420 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
421 static uint16_t getLocalLid(struct ibv_context *context, int port);
422 static int  checkQp(struct ibv_qp *qp){
423         struct ibv_qp_attr attr;
424         struct ibv_qp_init_attr init_attr;
425                  
426         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
427         if(attr.cur_qp_state != IBV_QPS_RTS){
428                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
429                 return 0;
430         }
431         return 1;
432 }
433 static void checkAllQps(){
434         int i;
435         for(i=0;i<_Cmi_numnodes;i++){
436                 if(i != _Cmi_mynode){
437                         if(!checkQp(nodes[i].infiData->qp)){
438                                 pollSendCq(0);
439                                 CmiAssert(0);
440                         }
441                 }
442         }
443 }
444
445 #if CMK_IBVERBS_FAST_START
446 static void send_partial_init();
447 #endif
448
449 static void CmiMachineInit(char **argv){
450         struct ibv_device **devList;
451         struct ibv_device *dev;
452         int ibPort;
453         int i;
454         int calcMaxSize;
455         infiPacket *pktPtrs;
456         struct infiRdmaPacket **rdmaPktPtrs;
457         int num_devices, idev;
458
459         MACHSTATE(3,"CmiMachineInit {");
460         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
461         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
462         
463         //TODO: make the device and ibport configureable by commandline parameter
464         //Check example for how to do that
465         devList =  ibv_get_device_list(&num_devices);
466         CmiAssert(num_devices > 0);
467         CmiAssert(devList != NULL);
468
469         context = (struct infiContext *)malloc(sizeof(struct infiContext));
470         MACHSTATE1(3,"context allocated %p",context);
471         
472         //localAddr will store the local addresses of all the qps
473         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
474         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
475
476         idev = 0;
477
478         // try all devices, can't assume device 0 is IB, it may be ethernet
479 loop:
480         dev = devList[idev];
481         CmiAssert(dev != NULL);
482
483         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
484
485         //the context for this infiniband device 
486         context->context = ibv_open_device(dev);
487         CmiAssert(context->context != NULL);
488
489         // test ibPort
490         int MAXPORT = 8;
491         for (ibPort = 1; ibPort < MAXPORT; ibPort++) {
492           struct ibv_port_attr attr;
493           if (ibv_query_port(context->context, ibPort, &attr) != 0) continue;
494 #if CMK_IBV_PORT_ATTR_HAS_LINK_LAYER
495           if (attr.link_layer == IBV_LINK_LAYER_INFINIBAND)  break;
496 #else
497           break;
498 #endif
499           
500         }
501         if (ibPort == MAXPORT) {
502           if (++idev == num_devices)
503             CmiAbort("No valid IB port found!");
504           else
505             goto loop;
506         }
507         context->ibPort = ibPort;
508         MACHSTATE1(3,"use port %d", ibPort);
509         
510         MACHSTATE1(3,"device opened %p",context->context);
511
512 /*      FD_ZERO(&context->asyncFds);
513         FD_SET(context->context->async_fd,&context->asyncFds);
514         context->tmo.tv_sec=0;
515         context->tmo.tv_usec=0;
516         
517         MACHSTATE(3,"asyncFds zeroed and set");*/
518
519         //protection domain
520         context->pd = ibv_alloc_pd(context->context);
521         CmiAssert(context->pd != NULL);
522         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
523
524   /******** At this point we know that this node is more or less serviceable
525         So, this is a good point for sending the partial init message for the fast
526         start case
527         Moreover, no work dependent on the number of nodes has started yet.
528         ************/
529
530 #if CMK_IBVERBS_FAST_START
531   send_partial_init();
532 #endif
533
534
535         context->header.nodeNo = _Cmi_mynode;
536
537         mtu_size=1200;
538         packetSize = mtu_size*4;
539         dataSize = packetSize-sizeof(struct infiPacketHeader);
540         
541         calcMaxSize=8000;
542 /*      if(_Cmi_numnodes*50 > calcMaxSize){
543                 calcMaxSize = _Cmi_numnodes*50;
544                 if(calcMaxSize > 10000){
545                         calcMaxSize = 10000;
546                 }
547         }*/
548 //      maxRecvBuffers=80;
549         maxRecvBuffers=calcMaxSize;
550         maxTokens = maxRecvBuffers;
551 //      maxTokens = 80;
552         context->tokensLeft=maxTokens;
553         context->qp=NULL;
554         //tokensPerProcessor=4;
555         if(_Cmi_numnodes > 1){
556 #if !CMK_IBVERBS_FAST_START
557                 /* a barrier to make sure all nodes initialized the device */
558                 ChMessage msg;
559                 ctrl_sendone_nolock("barrier",NULL,0,NULL,0);
560                 ChMessage_recv(Cmi_charmrun_fd,&msg);
561 #endif
562                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
563         }
564         
565         if (Cmi_charmrun_fd == -1) return;
566         
567         //TURN ON RDMA
568         rdma=1;
569 //      rdmaThreshold=32768;
570         rdmaThreshold=22000;
571         firstBinSize = 120;
572         CmiAssert(rdmaThreshold > firstBinSize);
573         /*      blockAllocRatio=16;
574                 blockThreshold=8;*/
575
576         blockAllocRatio=32;
577         blockThreshold=8;
578
579
580
581 #if !THREAD_MULTI_POOL
582         initInfiCmiChunkPools();
583 #endif
584
585         /*create the pool of send packets*/
586         sendPacketPoolSize = maxTokens/2;       
587         if(sendPacketPoolSize > 2000){
588                 sendPacketPoolSize = 2000;
589         }
590         
591         context->infiPacketFreeList=NULL;
592         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
593
594         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
595 #if !THREAD_MULTI_POOL
596         for(i=0;i<sendPacketPoolSize;i++){
597                 MallocInfiPacket(pktPtrs[i]);   
598         }
599
600         for(i=0;i<sendPacketPoolSize;i++){
601                 FreeInfiPacket(pktPtrs[i]);     
602         }
603         free(pktPtrs);
604 #endif
605         
606         context->bufferedBcastList=NULL;
607         context->bufferedRdmaAcks = NULL;
608         context->bufferedRdmaRequests = NULL;
609         context->insideProcessBufferedBcasts=0;
610         
611         
612         if(rdma){
613 /*              int numPkts;
614                 int k;
615                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
616                         numPkts = _Cmi_numnodes*4;
617                 }else{
618                         numPkts = maxRecvBuffers/4;
619                 }
620                 
621                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
622                 for(k=0;k<numPkts;k++){
623                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
624                 }
625                 
626                 for(k=0;k<numPkts;k++){
627                         CmiFree(rdmaPktPtrs[k]);
628                 }
629                 free(rdmaPktPtrs);*/
630         }
631         
632 /*      context->infiBufferedRecvList = NULL;*/
633 #if CMK_IBVERBS_STATS   
634         regCount =0;
635         regTime  = 0;
636
637         pktCount=0;
638         msgCount=0;
639
640         processBufferedCount=0;
641         processBufferedTime=0;
642
643         minTokensLeft = maxTokens;
644 #endif  
645
646         
647
648         MACHSTATE(3,"} CmiMachineInit");
649 }
650
651 void CmiCommunicationInit(char **argv)
652 {
653 #if THREAD_MULTI_POOL
654         initInfiCmiChunkPools();
655         fillBufferPools();
656 #endif
657 }
658
659 /*********
660         Open a qp for every processor
661 *****/
662 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
663         int myLid;
664         int i;
665         
666         
667         //find my lid
668         myLid = getLocalLid(context->context,ibPort);
669         
670         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
671
672         context->sendCqSize = maxTokens+2;
673         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
674         CmiAssert(context->sendCq != NULL);
675         
676         MACHSTATE1(3,"sendCq created %p",context->sendCq);
677         
678         
679         context->recvCqSize = maxRecvBuffers;
680         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
681         
682         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
683         CmiAssert(context->recvCq != NULL);
684         
685         //array of queue pairs
686
687         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
688
689         if(numNodes > 1)
690         {
691                 context->srqSize = (maxRecvBuffers+2);
692                 struct ibv_srq_init_attr srqAttr = {
693                         .attr = {
694                         .max_wr  = context->srqSize,
695                         .max_sge = 1
696                         }
697                 };
698                 context->srq = ibv_create_srq(context->pd,&srqAttr);
699                 CmiAssert(context->srq != NULL);
700         
701                 struct ibv_qp_init_attr initAttr = {
702                         .qp_type = IBV_QPT_RC,
703                         .send_cq = context->sendCq,
704                         .recv_cq = context->recvCq,
705                         .srq             = context->srq,
706                         .sq_sig_all = 0,
707                         .qp_context = NULL,
708                         .cap     = {
709                                 .max_send_wr  = maxTokens,
710                                 .max_send_sge = 2,
711                         },
712                 };
713                 struct ibv_qp_attr attr;
714
715                 attr.qp_state        = IBV_QPS_INIT;
716                 attr.pkey_index      = 0;
717                 attr.port_num        = ibPort;
718                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
719
720 /*              MACHSTATE1(3,"context->pd %p",context->pd);
721                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
722                 MACHSTATE1(3,"TEST QP %p",qp);*/
723
724                 for( i=1;i<numNodes;i++){
725                         int n = (myNode + i)%numNodes;
726                         if(n == myNode){
727                         }else{
728                                 localAddr[n].lid = myLid;
729                                 context->qp[n] = ibv_create_qp(context->pd,&initAttr);
730                         
731                                 MACHSTATE2(3,"qp[%d] created %p",n,context->qp[n]);
732                                 CmiAssert(context->qp[n] != NULL);
733                         
734                                 ibv_modify_qp(context->qp[n], &attr,
735                                           IBV_QP_STATE              |
736                                           IBV_QP_PKEY_INDEX         |
737                                         IBV_QP_PORT               |
738                                         IBV_QP_ACCESS_FLAGS);           
739
740                                 localAddr[n].qpn = context->qp[n]->qp_num;
741                                 localAddr[n].psn = lrand48() & 0xffffff;
742                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",n,localAddr[n].lid,localAddr[n].qpn,localAddr[n].psn);
743                         }
744                 }
745         }
746         MACHSTATE(3,"qps created");
747 };
748
749 void copyInfiAddr(ChInfiAddr *qpList){
750         int qpListIdx=0;
751         int i;
752         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
753         for(i=0;i<_Cmi_numnodes;i++){
754                 if(i == _Cmi_mynode){
755                 }else{
756                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
757                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
758                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
759                         qpListIdx++;
760                 }
761         }
762 }
763
764
765 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
766         struct ibv_port_attr attr;
767
768         if (ibv_query_port(dev_context, port, &attr))
769                 return 0;
770
771         return attr.lid;
772 }
773
774 /**************** END OF CmiMachineInit and its helper functions*/
775
776 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
777 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
778
779 /* Initial the infiniband specific data for a remote node
780         1. connect the qp and store it in and return it
781 **/
782 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
783         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
784         int err;
785         ret->state = INFI_HEADER_DATA;
786         ret->qp = context->qp[node];
787 //      ret->totalTokens = tokensPerProcessor;
788 //      ret->tokensLeft = tokensPerProcessor;
789         ret->nodeNo = node;
790 //      ret->postedRecvs = tokensPerProcessor;
791 #if     CMK_IBVERBS_DEBUG       
792         ret->psn = 0;
793         ret->recvPsn = 0;
794 #endif
795         
796         struct ibv_qp_attr attr = {
797                 .qp_state               = IBV_QPS_RTR,
798                 .path_mtu               = mtu,
799                 .dest_qp_num            = addr[1],
800                 .rq_psn                 = addr[2],
801                 .max_dest_rd_atomic     = 1,
802                 .min_rnr_timer          = 31,
803                 .ah_attr                = {
804                         .is_global      = 0,
805                         .dlid           = addr[0],
806                         .sl             = 0,
807                         .src_path_bits  = 0,
808                         .port_num       = context->ibPort
809                 }
810         };
811         
812         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
813         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
814         
815         if (err = ibv_modify_qp(ret->qp, &attr,
816           IBV_QP_STATE              |
817           IBV_QP_AV                 |
818           IBV_QP_PATH_MTU           |
819           IBV_QP_DEST_QPN           |
820           IBV_QP_RQ_PSN             |
821           IBV_QP_MAX_DEST_RD_ATOMIC |
822           IBV_QP_MIN_RNR_TIMER)) {
823                         MACHSTATE1(3,"ERROR %d",err);
824                         CmiAbort("failed to change qp state to RTR");
825         }
826
827         MACHSTATE(3,"qp state changed to RTR");
828         
829         attr.qp_state       = IBV_QPS_RTS;
830         attr.timeout        = 26;
831         attr.retry_cnt      = 20;
832         attr.rnr_retry      = 7;
833         attr.sq_psn         = context->localAddr[node].psn;
834         attr.max_rd_atomic  = 1;
835
836         
837         if (ibv_modify_qp(ret->qp, &attr,
838           IBV_QP_STATE              |
839           IBV_QP_TIMEOUT            |
840           IBV_QP_RETRY_CNT          |
841           IBV_QP_RNR_RETRY          |
842           IBV_QP_SQ_PSN             |
843           IBV_QP_MAX_QP_RD_ATOMIC)) {
844                         fprintf(stderr, "Failed to modify QP to RTS\n");
845                         exit(1);
846         }
847         MACHSTATE(3,"qp state changed to RTS");
848
849         MACHSTATE(3,"} initInfiOtherNodeData");
850         return ret;
851 }
852
853
854 void    infiPostInitialRecvs(){
855         //create the pool and post the receives
856         int numPosts;
857 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
858                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
859         }else{
860                 numPosts = maxRecvBuffers;
861         }*/
862
863         if(_Cmi_numnodes > 1){
864                 numPosts = maxRecvBuffers;
865         }else{
866                 numPosts = 0;
867         }
868         if(numPosts > 0){
869                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
870                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
871         }
872
873
874         if (context->qp) {
875           free(context->qp);
876           context->qp = NULL;
877         }
878         free(context->localAddr);
879         context->localAddr= NULL;
880 }
881
882 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
883         int numBuffers;
884         int i;
885         int bigSize;
886         char *bigBuf;
887         struct infiBufferPool *ret;
888         struct ibv_mr *bigKey;
889
890         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
891
892         page_size = sysconf(_SC_PAGESIZE);
893         ret = malloc(sizeof(struct infiBufferPool));
894         ret->next = NULL;
895         numBuffers=ret->numBuffers = numRecvs;
896         
897         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
898         
899         bigSize = numBuffers*sizePerBuffer;
900         bigBuf=malloc(bigSize);
901         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
902 #if CMK_IBVERBS_STATS
903         numCurReg++;
904         numReg++;
905 #endif
906
907         CmiAssert(bigKey != NULL);
908         
909         for(i=0;i<numBuffers;i++){
910                 struct infiBuffer *buffer =  &(ret->buffers[i]);
911                 buffer->type = BUFFER_RECV;
912                 buffer->size = sizePerBuffer;
913                 
914
915                 buffer->buf = &bigBuf[i*sizePerBuffer];
916                 buffer->key = bigKey;
917
918                 if(buffer->key == NULL){
919                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
920                         CmiAssert(buffer->key != NULL);
921                 }
922         }
923         return ret;
924 };
925
926
927
928 /**
929          Post the buffers as recv work requests
930 */
931 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
932         int j,err;
933         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
934         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
935         struct ibv_recv_wr *bad_wr;
936         
937         int startBufferIdx=0;
938         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
939         for(j=0;j<numRecvs;j++){
940                 
941                 
942                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
943                 sgElements[j].length = sizePerBuffer;
944                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
945                 
946                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
947                 workRequests[j].sg_list = &sgElements[j];
948                 workRequests[j].num_sge = 1;
949                 if(j != numRecvs-1){
950                         workRequests[j].next = &workRequests[j+1];
951                 }
952                 
953         }
954         workRequests[numRecvs-1].next = NULL;
955         MACHSTATE(3,"About to call ibv_post_srq_recv");
956         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
957                 CmiAssert(0);
958         }
959
960         free(workRequests);
961         free(sgElements);
962 }
963
964
965
966
967 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
968
969 void CmiMachineExit()
970 {
971 #if CMK_IBVERBS_STATS   
972         printf("[%d] numReg %d numUnReg %d numCurReg %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
973 #endif
974
975 }
976 static void ServiceCharmrun_nolock();
977
978 static void CmiNotifyStillIdle(CmiIdleState *s) {
979 #if CMK_SMP
980         CmiCommLock();
981 /*      if(where == COMM_SERVER_FROM_SMP)*/
982 #endif
983 /*              ServiceCharmrun_nolock();*/
984
985         CommunicationServer_nolock(0);
986 #if CMK_SMP
987         CmiCommUnlock();
988 #endif
989 }
990
991 static inline void increaseTokens(OtherNode node);
992
993 static inline int pollRecvCq(const int toBuffer);
994 static inline int pollSendCq(const int toBuffer);
995
996
997 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
998 #if !CMK_IBVERBS_TOKENS_FLOW
999         return;
1000 #else
1001         //if(infiData->tokensLeft == 0){
1002         if(context->tokensLeft == 0){
1003                 MACHSTATE(3,"GET FREE TOKENS {{{");
1004         }else{
1005                 return;
1006         }
1007         while(context->tokensLeft == 0){
1008                 CommunicationServer_nolock(1); 
1009         }
1010         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
1011 #endif
1012 }
1013
1014
1015 /**
1016         Packetize this data and send it
1017 **/
1018
1019
1020
1021 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
1022         int incTokens=0;
1023         int retval;
1024 #if     CMK_IBVERBS_DEBUG
1025         packet->header.psn = (++node->infiData->psn);
1026 #endif  
1027
1028
1029
1030         packet->elemList[1].addr = (uintptr_t)packet->buf;
1031         packet->elemList[1].length = size;
1032         packet->elemList[1].lkey = dataKey->lkey;
1033         
1034         
1035         packet->destNode = node;
1036         
1037 #if CMK_IBVERBS_STATS   
1038         pktCount++;
1039 #endif  
1040         
1041         getFreeTokens(node->infiData);
1042
1043 #if CMK_IBVERBS_INCTOKENS       
1044         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
1045                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
1046                 incTokens=1;
1047         }
1048 #endif
1049 /*
1050         if(!checkQp(node->infiData->qp)){
1051                 pollSendCq(1);
1052                 CmiAssert(0);
1053         }*/
1054
1055         struct ibv_send_wr *bad_wr=NULL;
1056         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1057                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
1058                 CmiAssert(0);
1059         }
1060 #if     CMK_IBVERBS_TOKENS_FLOW
1061         context->tokensLeft--;
1062 #if     CMK_IBVERBS_STATS
1063         if(context->tokensLeft < minTokensLeft){
1064                 minTokensLeft = context->tokensLeft;
1065         }
1066 #endif
1067 #endif
1068
1069 /*      if(!checkQp(node->infiData->qp)){
1070                 pollSendCq(1);
1071                 CmiAssert(0);
1072         }*/
1073
1074 #if CMK_IBVERBS_INCTOKENS       
1075         if(incTokens){
1076                 increaseTokens(node);
1077         }
1078 #endif
1079
1080
1081 #if     CMK_IBVERBS_DEBUG
1082         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1083 #else
1084         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1085 #endif
1086
1087 };
1088
1089
1090 static void inline EnqueueDummyPacket(OtherNode node,int size){
1091         infiPacket packet;
1092         MallocInfiPacket(packet);
1093         packet->size = size;
1094         packet->buf = CmiAlloc(size);
1095         
1096         packet->header.code = INFIDUMMYPACKET;
1097
1098         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1099         
1100         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1101         EnqueuePacket(node,packet,size,key);
1102 }
1103
1104
1105
1106
1107
1108
1109 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1110         infiPacket packet;
1111         MallocInfiPacket(packet);
1112         packet->size = size;
1113         packet->buf=data;
1114         
1115         //the nodeNo is added at time of packet allocation
1116         packet->header.code = INFIPACKETCODE_DATA;
1117         
1118         ogm->refcount++;
1119         packet->ogm = ogm;
1120         
1121         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1122         CmiAssert(key != NULL);
1123         
1124         EnqueuePacket(node,packet,size,key);
1125 };
1126
1127 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1128 static inline void processAllBufferedMsgs();
1129
1130 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1131         int size; char *data;
1132 //      processAllBufferedMsgs();
1133
1134         
1135         ogm->refcount++;
1136   size = ogm->size;
1137   data = ogm->data;
1138
1139 #if CMK_IBVERBS_STATS   
1140         msgCount++;
1141 #endif
1142
1143         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1144         //First packet has dgram header, other packets dont
1145         
1146   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1147         
1148         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1149
1150         if(rdma && size > rdmaThreshold){
1151                         EnqueueRdmaPacket(ogm,node);
1152         }else{
1153         
1154                 while(size > dataSize){
1155                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1156                         size -= dataSize;
1157                         data += dataSize;
1158                 }
1159                 if(size > 0){
1160                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1161                 }
1162         }
1163 //#if   !CMK_SMP
1164         processAllBufferedMsgs();
1165 //#endif
1166         ogm->refcount--;
1167         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1168 }
1169
1170
1171 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1172         infiPacket packet;
1173
1174         ogm->refcount++;
1175         
1176         MallocInfiPacket(packet);
1177  
1178  {
1179                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1180
1181                 
1182                 packet->size = sizeof(struct infiRdmaPacket);
1183                 packet->buf = (char *)rdmaPacket;
1184                 
1185                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1186
1187                 CmiAssert(key!=NULL);
1188
1189                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1190                 
1191                 packet->header.code = INFIRDMA_START;
1192                 packet->header.nodeNo = _Cmi_mynode;
1193                 packet->ogm = NULL;
1194
1195                 rdmaPacket->type = INFI_MESG;
1196                 rdmaPacket->ogm = ogm;
1197                 rdmaPacket->key = *key;
1198                 rdmaPacket->keyPtr = key;
1199                 rdmaPacket->remoteBuf = ogm->data;
1200                 rdmaPacket->remoteSize = ogm->size;
1201                 
1202                 
1203                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1204                 
1205                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1206                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1207         }
1208 }
1209
1210
1211
1212 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1213 static inline void processSendWC(struct ibv_wc *sendWC);
1214 static unsigned int _count=0;
1215 extern int errno;
1216 static int _countAsync=0;
1217 static inline void processAsyncEvents(){
1218         struct ibv_async_event event;
1219         int ready;
1220         _countAsync++;
1221         if(_countAsync < 1){
1222                 return;
1223         }
1224         _countAsync=0;
1225         FD_SET(context->context->async_fd,&context->asyncFds);
1226         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1227         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1228         
1229         if(ready==0){
1230                 return;
1231         }
1232         if(ready == -1){
1233 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1234                 return;
1235         }
1236         
1237         if (ibv_get_async_event(context->context, &event)){
1238                 return;
1239                 CmiAbort("get async event failed");
1240         }
1241         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1242         ibv_ack_async_event(&event);
1243
1244         
1245 }
1246
1247 static void pollCmiDirectQ();
1248
1249 static inline  void CommunicationServer_nolock(int toBuffer) {
1250         int processed;
1251         if(_Cmi_numnodes <= 1){
1252                 pollCmiDirectQ();
1253                 return;
1254         }
1255         MACHSTATE(2,"CommServer_nolock{");
1256         
1257 //      processAsyncEvents();
1258         
1259 //      checkAllQps();
1260
1261         pollCmiDirectQ();
1262
1263         processed = pollRecvCq(toBuffer);
1264         
1265
1266         processed += pollSendCq(toBuffer);
1267         
1268         if(toBuffer == 0){
1269 //              if(processed != 0)
1270                         processAllBufferedMsgs();
1271         }
1272         
1273 //      checkAllQps();
1274 //      _count--;
1275
1276         MACHSTATE(2,"} CommServer_nolock ne");
1277         
1278 }
1279 /*
1280 static inline infiBufferedWC createInfiBufferedWC(){
1281         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1282         ret->count = 0;
1283         ret->next = ret->prev =NULL;
1284         return ret;
1285 }*/
1286
1287 /****
1288         The buffered recvWC are stored in a doubly linked list of 
1289         arrays or blocks of wcs.
1290         To keep the average insert cost low, a new block is added 
1291         to the top of the list. (resulting in a reverse seq of blocks)
1292         Within a block however wc are stored in a sequence
1293 *****/
1294 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1295         infiBufferedWC block;
1296         MACHSTATE(3,"Insert Buffered Recv called");
1297         if( context->infiBufferedRecvList ==NULL){
1298                 context->infiBufferedRecvList = createInfiBufferedWC();
1299                 block = context->infiBufferedRecvList;
1300         }else{
1301                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1302                         block = createInfiBufferedWC();
1303                         context->infiBufferedRecvList->prev = block;
1304                         block->next = context->infiBufferedRecvList;
1305                         context->infiBufferedRecvList = block;
1306                 }else{
1307                         block = context->infiBufferedRecvList;
1308                 }
1309         }
1310         
1311         block->wcList[block->count] = *wc;
1312         block->count++;
1313 };
1314 */
1315
1316
1317 /********
1318 go through the blocks of bufferedWC. Process the last block first.
1319 Then the next one and so on. (Processing within a block should happen
1320 in sequence).
1321 Leave the last block in place to avoid having to allocate again
1322 ******/
1323 /*static inline void processBufferedRecvList(){
1324         infiBufferedWC start;
1325         start = context->infiBufferedRecvList;
1326         while(start->next != NULL){
1327                 start = start->next;
1328         }
1329         while(start != NULL){
1330                 int i=0;
1331                 infiBufferedWC tmp;
1332                 for(i=0;i<start->count;i++){
1333                         processRecvWC(&start->wcList[i]);
1334                 }
1335                 if(start != context->infiBufferedRecvList){
1336                         //not the first one
1337                         tmp = start;
1338                         start = start->prev;
1339                         free(tmp);
1340                         start->next = NULL;
1341                 }else{
1342                         start = start->prev;
1343                 }
1344         }
1345         context->infiBufferedRecvList->next = NULL;
1346         context->infiBufferedRecvList->prev = NULL;
1347         context->infiBufferedRecvList->count = 0;
1348 }
1349 */
1350
1351 static inline int pollRecvCq(const int toBuffer){
1352         int i;
1353         int ne;
1354         struct ibv_wc wc[WC_LIST_SIZE];
1355         
1356         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1357         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1358 //      CmiAssert(ne >=0);
1359         
1360         if(ne != 0){
1361                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1362         }
1363         
1364         for(i=0;i<ne;i++){
1365                 if(wc[i].status != IBV_WC_SUCCESS){
1366                         CmiAssert(0);
1367                 }
1368                 switch(wc[i].opcode){
1369                         case IBV_WC_RECV:
1370                                         processRecvWC(&wc[i],toBuffer);
1371                                 break;
1372                         default:
1373                                 CmiAbort("Wrong type of work completion object in recvq");
1374                                 break;
1375                 }
1376                         
1377         }
1378         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1379         return ne;
1380
1381 }
1382
1383 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1384
1385 static inline int pollSendCq(const int toBuffer){
1386         int i;
1387         int ne;
1388         struct ibv_wc wc[WC_LIST_SIZE];
1389
1390         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1391 //      CmiAssert(ne >=0);
1392         
1393         
1394         for(i=0;i<ne;i++){
1395                 if(wc[i].status != IBV_WC_SUCCESS){
1396                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1397 #if CMK_IBVERBS_STATS
1398         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1399 #endif
1400                         CmiAssert(0);
1401                 }
1402                 switch(wc[i].opcode){
1403                         case IBV_WC_SEND:{
1404                                 //message received
1405                                 processSendWC(&wc[i]);
1406                                 
1407                                 break;
1408                                 }
1409                         case IBV_WC_RDMA_READ:
1410                         {
1411 //                              processRdmaWC(&wc[i],toBuffer);
1412                                         processRdmaWC(&wc[i],1);
1413                                 break;
1414                         }
1415                         case IBV_WC_RDMA_WRITE:
1416                         {
1417                                 /*** used for CmiDirect puts 
1418                                 Nothing needs to be done on the sender side once send is done **/
1419                                 break;
1420                         }
1421                         default:
1422                                 CmiAbort("Wrong type of work completion object in recvq");
1423                                 break;
1424                 }
1425                         
1426         }
1427         return ne;
1428 }
1429
1430
1431 /******************
1432 Check the communication server socket and
1433
1434 *****************/
1435 int CheckSocketsReady(int withDelayMs)
1436 {   
1437   int nreadable;
1438   CMK_PIPE_DECL(withDelayMs);
1439
1440   CmiStdoutAdd(CMK_PIPE_SUB);
1441   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1442
1443   nreadable=CMK_PIPE_CALL();
1444   ctrlskt_ready_read = 0;
1445   dataskt_ready_read = 0;
1446   dataskt_ready_write = 0;
1447   
1448   if (nreadable == 0) {
1449     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1450     return nreadable;
1451   }
1452   if (nreadable==-1) {
1453     CMK_PIPE_CHECKERR();
1454     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1455     return CheckSocketsReady(0);
1456   }
1457   CmiStdoutCheck(CMK_PIPE_SUB);
1458   if (Cmi_charmrun_fd!=-1) 
1459           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1460   MACHSTATE(1,"} CheckSocketsReady")
1461   return nreadable;
1462 }
1463
1464
1465 /*** Service the charmrun socket
1466 *************/
1467
1468 static void ServiceCharmrun_nolock()
1469 {
1470   int again = 1;
1471   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1472   while (again)
1473   {
1474   again = 0;
1475   CheckSocketsReady(0);
1476   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1477   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1478   }
1479   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1480 }
1481
1482
1483
1484 static void CommunicationServer(int sleepTime, int where){
1485         if( where == COMM_SERVER_FROM_INTERRUPT){
1486 #if CMK_IMMEDIATE_MSG
1487                 CmiHandleImmediate();
1488 #endif
1489                 return;
1490         }
1491 #if CMK_SMP
1492         if(where == COMM_SERVER_FROM_WORKER){
1493                 return;
1494         }
1495         if(where == COMM_SERVER_FROM_SMP){
1496 #endif
1497                 ServiceCharmrun_nolock();
1498 #if CMK_SMP
1499         }
1500         CmiCommLock();
1501 #endif
1502         CommunicationServer_nolock(0);
1503 #if CMK_SMP
1504         CmiCommUnlock();
1505 #endif
1506
1507         /* when called by communication thread or in interrupt */
1508 #if CMK_IMMEDIATE_MSG
1509         if (where == COMM_SERVER_FROM_SMP) {
1510                 CmiHandleImmediate();
1511         }
1512 #endif
1513 }
1514
1515
1516 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1517
1518
1519 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1520
1521 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1522         char *newmsg;
1523         
1524         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1525         
1526         OtherNode node = &nodes[nodeNo];
1527         newmsg = node->asm_msg;         
1528         
1529         /// This simple state machine determines if this packet marks the beginning of a new message
1530         // from another node, or if this is another in a sequence of packets
1531         switch(node->infiData->state){
1532                 case INFI_HEADER_DATA:
1533                 {
1534                         int size;
1535                         int rank, srcpe, seqno, magic, i;
1536                         unsigned int broot;
1537                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1538                         size = CmiMsgHeaderGetLength(msg);
1539                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1540 //                      CmiAssert(size > 0);
1541 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1542                         
1543 //                      CmiAssert(newmsg == NULL);
1544                         if(len > size){
1545                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1546                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1547                         }
1548                         newmsg = (char *)CmiAlloc(size);
1549       _MEMCHECK(newmsg);
1550       memcpy(newmsg, msg, len);
1551       node->asm_rank = rank;
1552       node->asm_total = size;
1553       node->asm_fill = len;
1554       node->asm_msg = newmsg;
1555                         node->infiData->broot = broot;
1556                         if(len == size){
1557                                 //this is the only packet for this message 
1558                                 node->infiData->state = INFI_HEADER_DATA;
1559                         }else{
1560                                 //there are more packets following
1561                                 node->infiData->state = INFI_DATA;
1562                         }
1563                         break;
1564                 }
1565                 case INFI_DATA:
1566                 {
1567                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1568                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1569                                 CmiAbort("packet in the middle does not have expected length");
1570                         }
1571                         if(node->asm_fill+len > node->asm_total){
1572                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1573                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1574                         }
1575                         //tODO: remove this
1576                         memcpy(newmsg + node->asm_fill,msg,len);
1577                         node->asm_fill += len;
1578                         if(node->asm_fill == node->asm_total){
1579                                 node->infiData->state = INFI_HEADER_DATA;
1580                         }else{
1581                                 node->infiData->state = INFI_DATA;
1582                         }
1583                         break;
1584                 }
1585         }
1586         /// if this packet was the last packet in a message ie state was 
1587         /// reset to infi_header_data
1588         
1589         if(node->infiData->state == INFI_HEADER_DATA){
1590                 int total_size = node->asm_total;
1591                 node->asm_msg = NULL;
1592                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1593                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1594         }
1595         
1596 };
1597
1598 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1599 #if CMK_BROADCAST_SPANNING_TREE
1600         if (rank == DGRAM_BROADCAST
1601 #if CMK_NODE_QUEUE_AVAILABLE
1602           || rank == DGRAM_NODEBROADCAST
1603 #endif
1604          ){
1605                  if(toBuffer){
1606                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1607                 }else{
1608                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1609                 }
1610         }
1611 #elif CMK_BROADCAST_HYPERCUBE
1612         if (rank == DGRAM_BROADCAST
1613 #if CMK_NODE_QUEUE_AVAILABLE
1614           || rank == DGRAM_NODEBROADCAST
1615 #endif
1616          ){
1617                  if(toBuffer){
1618                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1619                  }else{
1620                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1621                 }
1622         }
1623 #endif
1624
1625         switch (rank) {
1626         case DGRAM_BROADCAST: {
1627                                 int i;
1628                                 for (i=1; i<_Cmi_mynodesize; i++){
1629                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1630                                 }
1631           CmiPushPE(0, newmsg);
1632           break;
1633       }
1634 #if CMK_NODE_QUEUE_AVAILABLE
1635         case DGRAM_NODEBROADCAST: 
1636         case DGRAM_NODEMESSAGE: {
1637           CmiPushNode(newmsg);
1638           break;
1639         }
1640 #endif
1641         default:
1642                                 {
1643                                         
1644           CmiPushPE(rank, newmsg);
1645                                 }
1646         }    /* end of switch */
1647                 if(!toBuffer){
1648 //#if !CMK_SMP          
1649                 processAllBufferedMsgs();
1650 //#endif
1651                 }
1652 }
1653
1654
1655 static inline void increasePostedRecvs(int nodeNo);
1656 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1657 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1658
1659 //struct infiDirectRequestPacket;
1660 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1661
1662 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1663         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1664         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1665         int nodeNo = header->nodeNo;
1666 #if     CMK_IBVERBS_DEBUG
1667         OtherNode node = &nodes[nodeNo];
1668 #endif
1669         
1670         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1671 #if     CMK_IBVERBS_DEBUG
1672         /*
1673         if(node->infiData->recvPsn == 0){
1674                 node->infiData->recvPsn = header->psn;
1675         }else{
1676                         CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1677            *            node->infiData->recvPsn++; 
1678         }
1679         */
1680         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1681 #else
1682         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1683 #endif
1684         
1685         if(header->code & INFIPACKETCODE_DATA){
1686                         
1687                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1688         }
1689         if(header->code & INFIDUMMYPACKET){
1690                 MACHSTATE(3,"Dummy packet");
1691         }
1692         if(header->code & INFIBARRIERPACKET){
1693                 MACHSTATE(3,"Barrier packet");
1694                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1695         }
1696
1697 #if CMK_IBVERBS_INCTOKENS       
1698         if(header->code & INFIPACKETCODE_INCTOKENS){
1699                 increasePostedRecvs(nodeNo);
1700         }
1701 #endif  
1702         if(rdma && header->code & INFIRDMA_START){
1703                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1704 //              if(toBuffer){
1705                         //TODO: make a function of this and use for both acks and requests
1706                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1707                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1708                         *copyPacket = *rdmaPacket;
1709                         copyPacket->fromNodeNo = nodeNo;
1710                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1711                         context->bufferedRdmaRequests = copyPacket;
1712                         copyPacket->next = tmp;
1713                         copyPacket->prev = NULL;
1714                         if(tmp != NULL){
1715                                 tmp->prev = copyPacket;
1716                         }
1717 /*              }else{
1718                         processRdmaRequest(rdmaPacket,nodeNo,0);
1719                 }*/
1720         }
1721         if(rdma && header->code & INFIRDMA_ACK){
1722                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1723                 processRdmaAck(rdmaPacket);
1724         }
1725 /*      if(header->code & INFIDIRECT_REQUEST){
1726                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1727                 processDirectRequest(directRequestPacket);
1728         }*/
1729         {
1730                 struct ibv_sge list = {
1731                         .addr   = (uintptr_t) buffer->buf,
1732                         .length = buffer->size,
1733                         .lkey   = buffer->key->lkey
1734                 };
1735         
1736                 struct ibv_recv_wr wr = {
1737                         .wr_id = (uint64_t)buffer,
1738                         .sg_list = &list,
1739                         .num_sge = 1,
1740                         .next = NULL
1741                 };
1742                 struct ibv_recv_wr *bad_wr;
1743         
1744                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1745                         CmiAssert(0);
1746                 }
1747         }
1748
1749 };
1750
1751
1752
1753
1754 static inline  void processSendWC(struct ibv_wc *sendWC){
1755
1756         infiPacket packet = (infiPacket )sendWC->wr_id;
1757 #if CMK_IBVERBS_TOKENS_FLOW
1758 //      packet->destNode->infiData->tokensLeft++;
1759         context->tokensLeft++;
1760 #endif
1761
1762         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1763         if(packet->ogm != NULL){
1764                 packet->ogm->refcount--;
1765                 if(packet->ogm->refcount == 0){
1766                         GarbageCollectMsg(packet->ogm); 
1767                 }
1768         }else{
1769                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1770                    if (packet->buf) CmiFree(packet->buf);  /* gzheng */
1771                 }
1772         }
1773
1774         FreeInfiPacket(packet);
1775 };
1776
1777
1778
1779 /********************************************************************/
1780 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1781         int nodeNo = fromNodeNo;
1782         OtherNode node = &nodes[nodeNo];
1783         struct infiRdmaPacket *rdmaPacket;
1784
1785         getFreeTokens(node->infiData);
1786 #if CMK_IBVERBS_TOKENS_FLOW
1787 //      node->infiData->tokensLeft--;
1788         context->tokensLeft--;
1789 #if     CMK_IBVERBS_STATS
1790         if(context->tokensLeft < minTokensLeft){
1791                 minTokensLeft = context->tokensLeft;
1792         }
1793 #endif
1794 #endif
1795         
1796         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1797 //      CmiAssert(buffer != NULL);
1798         
1799         
1800         if(isBuffered){
1801                 rdmaPacket = _rdmaPacket;
1802         }else{
1803                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1804                 *rdmaPacket = *_rdmaPacket;
1805         }
1806
1807
1808         rdmaPacket->fromNodeNo = fromNodeNo;
1809         rdmaPacket->localBuffer = (void *)buffer;
1810         
1811         buffer->type = BUFFER_RDMA;
1812         buffer->size = rdmaPacket->remoteSize;
1813         
1814         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1815 //      CmiAssert(buffer->buf != NULL);
1816
1817         buffer->key = METADATAFIELD(buffer->buf)->key;
1818
1819         
1820         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1821         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1822 //      CmiAssert(buffer->key != NULL);
1823         
1824         {
1825                 struct ibv_sge list = {
1826                         .addr = (uintptr_t )buffer->buf,
1827                         .length = buffer->size,
1828                         .lkey   = buffer->key->lkey
1829                 };
1830
1831                 struct ibv_send_wr *bad_wr;
1832                 struct ibv_send_wr wr = {
1833                         .wr_id = (uint64_t )rdmaPacket,
1834                         .sg_list = &list,
1835                         .num_sge = 1,
1836                         .opcode = IBV_WR_RDMA_READ,
1837                         .send_flags = IBV_SEND_SIGNALED,
1838                         .wr.rdma = {
1839                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1840                                 .rkey = rdmaPacket->key.rkey
1841                         }
1842                 };
1843                 /** post and rdma_read that is a rdma get*/
1844                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1845                         CmiAssert(0);
1846                 }
1847         }
1848
1849 };
1850
1851 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1852 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1853
1854 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1855                 //rdma get done
1856 #if CMK_IBVERBS_STATS
1857         double _startRegTime;
1858 #endif  
1859
1860         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1861 /*      if(rdmaPacket->type == INFI_DIRECT){
1862                 processDirectWC(rdmaPacket);
1863                 return;
1864         }*/
1865 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1866         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1867
1868         /*TODO: remove this
1869         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1870         
1871 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1872         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1873         
1874         {
1875                 int size;
1876                 int rank, srcpe, seqno, magic, i;
1877                 unsigned int broot;
1878                 char *msg = buffer->buf;
1879                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1880                 size = CmiMsgHeaderGetLength(msg);
1881 /*              CmiAssert(size == buffer->size);*/
1882                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1883         }
1884         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1885
1886         
1887         free(buffer);
1888         
1889         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1890         //we are sending this ack as a response to a successful
1891         // rdma_Read.. the token for that rdma_Read needs to be freed
1892 #if CMK_IBVERBS_TOKENS_FLOW     
1893         //node->infiData->tokensLeft++;
1894         context->tokensLeft++;
1895 #endif
1896
1897         //send ack to sender if toBuffer is off otherwise buffer it
1898         if(toBuffer){
1899                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1900                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1901                 context->bufferedRdmaAcks = rdmaPacket;
1902                 rdmaPacket->next = tmp;
1903                 rdmaPacket->prev = NULL;
1904                 if(tmp != NULL){
1905                         tmp->prev = rdmaPacket; 
1906                 }
1907         }else{
1908                 EnqueueRdmaAck(rdmaPacket);             
1909                 free(rdmaPacket);
1910         }
1911 }
1912
1913 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1914         infiPacket packet;
1915         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1916
1917         
1918         MallocInfiPacket(packet);
1919         {
1920                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1921                 *ackPacket = *rdmaPacket;
1922                 packet->size = sizeof(struct infiRdmaPacket);
1923                 packet->buf = (char *)ackPacket;
1924                 packet->header.code = INFIRDMA_ACK;
1925                 packet->ogm=NULL;
1926                 
1927                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1928         
1929
1930                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1931         }
1932 };
1933
1934
1935 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1936         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1937         rdmaPacket->ogm->refcount--;
1938         GarbageCollectMsg(rdmaPacket->ogm);
1939 }
1940
1941
1942 /****************************
1943  Deal with all the buffered (delayed) messages
1944  such as processing recvd broadcasts, sending
1945  rdma acks and processing recvd rdma requests
1946 ******************************/
1947
1948
1949 static inline infiBufferedBcastPool createBcastPool(){
1950         int i;
1951         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1952         ret->count = 0;
1953         ret->next = ret->prev = NULL;   
1954         for(i=0;i<BCASTLIST_SIZE;i++){
1955                 ret->bcastList[i].valid = 0;
1956         }
1957         return ret;
1958 };
1959 /****
1960         The buffered bcast messages are stored in a doubly linked list of 
1961         arrays or blocks.
1962         To keep the average insert cost low, a new block is added 
1963         to the top of the list. (resulting in a reverse seq of blocks)
1964         Within a block however bcast are stored in increasing order sequence
1965 *****/
1966
1967 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1968         if(context->bufferedBcastList == NULL){
1969                 context->bufferedBcastList = createBcastPool();
1970         }else{
1971                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1972                         infiBufferedBcastPool tmp;
1973                         tmp = createBcastPool();
1974                         context->bufferedBcastList->prev = tmp;
1975                         tmp->next = context->bufferedBcastList;
1976                         context->bufferedBcastList = tmp;
1977                 }
1978         }
1979         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1980         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1981         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1982         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1983         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1984         
1985         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1986         
1987         context->bufferedBcastList->count++;
1988 }
1989
1990 /*********
1991         Go through the blocks of buffered bcast messages. process last block first
1992         processign within a block is in sequence though
1993 *********/
1994 static inline void processBufferedBcast(){
1995         infiBufferedBcastPool start;
1996
1997         if(context->bufferedBcastList == NULL){
1998                 return;
1999         }
2000         start = context->bufferedBcastList;
2001         if(context->insideProcessBufferedBcasts==1){
2002                 return;
2003         }
2004         context->insideProcessBufferedBcasts=1;
2005
2006         while(start->next != NULL){
2007                 start = start->next;
2008         }
2009         
2010         while(start != NULL){
2011                 int i=0;
2012                 infiBufferedBcastPool tmp;
2013                 if(start->count != 0){
2014                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
2015                 }
2016                 for(i=0;i<start->count;i++){
2017                         if(start->bcastList[i].valid == 0){
2018                                 continue;
2019                         }
2020                         start->bcastList[i].valid=0;
2021                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
2022 #if CMK_BROADCAST_SPANNING_TREE
2023         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
2024 #if CMK_NODE_QUEUE_AVAILABLE
2025           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
2026 #endif
2027          ){
2028                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
2029                 CmiFree(start->bcastList[i].msg);           /* gzheng */
2030                                         }
2031 #elif CMK_BROADCAST_HYPERCUBE
2032         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
2033 #if CMK_NODE_QUEUE_AVAILABLE
2034           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
2035 #endif
2036          ){
2037                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
2038                 CmiFree(start->bcastList[i].msg);           /* gzheng */
2039                                         }
2040 #endif
2041                 }
2042                 if(start->count != 0){
2043                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
2044                 }
2045                 
2046                 tmp = start;
2047                 start = start->prev;
2048                 free(tmp);
2049                 if(start != NULL){
2050                         //not the first one
2051                         start->next = NULL;
2052                 }
2053         }
2054
2055         context->bufferedBcastList = NULL;
2056 /*      context->bufferedBcastList->prev = NULL;
2057         context->bufferedBcastList->count =0;   */
2058         context->insideProcessBufferedBcasts=0;
2059         MACHSTATE(2,"processBufferedBcast done ");
2060 };
2061
2062
2063 static inline void processBufferedRdmaAcks(){
2064         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2065         if(start == NULL){
2066                 return;
2067         }
2068         while(start->next != NULL){
2069                 start = start->next;
2070         }
2071         while(start != NULL){
2072                 struct infiRdmaPacket *rdmaPacket=start;
2073                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2074                 EnqueueRdmaAck(rdmaPacket);
2075                 start = start->prev;
2076                 free(rdmaPacket);
2077         }
2078         context->bufferedRdmaAcks=NULL;
2079 }
2080
2081
2082
2083 static inline void processBufferedRdmaRequests(){
2084         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2085         if(start == NULL){
2086                 return;
2087         }
2088         
2089         
2090         while(start->next != NULL){
2091                 start = start->next;
2092         }
2093         while(start != NULL){
2094                 struct infiRdmaPacket *rdmaPacket=start;
2095                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2096                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2097                 start = start->prev;
2098         }
2099         
2100         context->bufferedRdmaRequests=NULL;
2101 }
2102
2103
2104
2105
2106
2107 static inline void processAllBufferedMsgs(){
2108 #if CMK_IBVERBS_STATS
2109         double _startTime = CmiWallTimer();
2110         processBufferedCount++;
2111 #endif
2112         processBufferedBcast();
2113
2114         processBufferedRdmaAcks();
2115         processBufferedRdmaRequests();
2116 #if CMK_IBVERBS_STATS
2117         processBufferedTime += (CmiWallTimer()-_startTime);
2118 #endif  
2119 };
2120
2121
2122 /*************************
2123         Increase tokens when short of them
2124 **********/
2125 static inline void increaseTokens(OtherNode node){
2126         int err;
2127         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2128         if(node->infiData->totalTokens + increase > maxTokens){
2129                 increase = maxTokens-node->infiData->totalTokens;
2130         }
2131         node->infiData->totalTokens += increase;
2132         node->infiData->tokensLeft += increase;
2133         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2134         //increase the size of the sendCq
2135         int currentCqSize = context->sendCqSize;
2136         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2137                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2138                 CmiAssert(0);
2139         }
2140         context->sendCqSize+= increase;
2141 };
2142
2143 static void increasePostedRecvs(int nodeNo){
2144         OtherNode node = &nodes[nodeNo];
2145         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2146         int recvIncrease = tokenIncrease;
2147         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2148                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2149         }
2150         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2151                 recvIncrease = maxRecvBuffers-context->srqSize;
2152         }
2153         node->infiData->postedRecvs+= recvIncrease;
2154         context->srqSize += recvIncrease;
2155         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2156         //increase the size of the recvCq
2157         int currentCqSize = context->recvCqSize;
2158         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2159                 CmiAssert(0);
2160         }
2161         context->recvCqSize += tokenIncrease;
2162         if(recvIncrease > 0){
2163                 //create another bufferPool and attach it to the top of the current one
2164                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2165                 newPool->next = context->recvBufferPool;
2166                 context->recvBufferPool = newPool;
2167                 postInitialRecvs(newPool,recvIncrease,packetSize);
2168         }
2169
2170 };
2171
2172
2173
2174
2175 /*********************************************
2176         Memory management routines for RDMA
2177
2178 ************************************************/
2179
2180 /**
2181         There are INFINUMPOOLS of memory.
2182         The first pool is of size firstBinSize.
2183         The ith pool is of size firstBinSize*2^i
2184 */
2185
2186 static void initInfiCmiChunkPools(){
2187         int i,j;
2188         int size = firstBinSize;
2189         int nodeSize;
2190
2191 #if THREAD_MULTI_POOL
2192         nodeSize = CmiMyNodeSize() + 1;
2193         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2194         for(i = 0; i < nodeSize; i++){
2195                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2196         }
2197         for(j = 0; j < nodeSize; j++){
2198                 size = firstBinSize;
2199                 for(i=0;i<INFINUMPOOLS;i++){
2200                         infiCmiChunkPools[j][i].size = size;
2201                         infiCmiChunkPools[j][i].startBuf = NULL;
2202                         infiCmiChunkPools[j][i].count = 0;
2203                         size *= 2;
2204                 }
2205         }
2206
2207         // creating the n^2 system of queues
2208         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2209         for(i = 0; i < nodeSize; i++){
2210                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2211         }
2212         for(i = 0; i < nodeSize; i++)
2213                 for(j = 0; j < nodeSize; j++)
2214                         queuePool[i][j] = PCQueueCreate();
2215
2216 #else
2217
2218         size = firstBinSize;    
2219         for(i=0;i<INFINUMPOOLS;i++){
2220                 infiCmiChunkPools[i].size = size;
2221                 infiCmiChunkPools[i].startBuf = NULL;
2222                 infiCmiChunkPools[i].count = 0;
2223                 size *= 2;
2224         }
2225 #endif
2226
2227 }
2228
2229 /***********
2230 Register memory for a part of a received multisend message
2231 *************/
2232 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2233         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2234         char *res=msg-sizeof(infiCmiChunkHeader);
2235         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2236 #if CMK_IBVERBS_STATS
2237         numCurReg++;
2238         numReg++;
2239         numMultiSend++;
2240 #endif
2241         CmiAssert(metaData->key!=NULL);
2242         metaData->owner = NULL;
2243         metaData->poolIdx = INFIMULTIPOOL;
2244
2245         return metaData;
2246 };
2247
2248
2249 #if THREAD_MULTI_POOL
2250
2251 // Fills up the buffer pools for every thread in the node
2252 static inline void fillBufferPools(){
2253         int nodeSize, poolIdx, thread;
2254         infiCmiChunkMetaData *metaData;         
2255         infiCmiChunkHeader *hdr;
2256         int allocSize;
2257         int count=1;
2258         int i;
2259         struct ibv_mr *key;
2260         void *res;
2261
2262         // initializing values
2263         nodeSize = CmiMyNodeSize() + 1;
2264
2265         // iterating over all threads and all pools
2266         for(thread = 0; thread < nodeSize; thread++){
2267                 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2268                         allocSize = infiCmiChunkPools[thread][poolIdx].size;
2269                         if(poolIdx < blockThreshold){
2270                                 count = blockAllocRatio;
2271                         }else{
2272                                 count = 1;
2273                         }
2274                         res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2275                         hdr = res;
2276                         key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2277                         CmiAssert(key != NULL);
2278 #if CMK_IBVERBS_STATS
2279                 numCurReg++;
2280                 numReg++;
2281 #endif
2282                         res += sizeof(infiCmiChunkHeader);
2283                         for(i=0;i<count;i++){
2284                                 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2285                                 metaData->key = key;
2286                                 metaData->owner = hdr;
2287                                 metaData->poolIdx = poolIdx;
2288                                 metaData->parentPe = thread;                                            // setting the parent PE
2289                                 if(i == 0){
2290                                         metaData->owner->metaData->count = count;
2291                                         metaData->nextBuf = NULL;
2292                                 }else{
2293                                         void *startBuf = res - sizeof(infiCmiChunkHeader);
2294                                         metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2295                                         infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2296                                         infiCmiChunkPools[thread][poolIdx].count++;
2297                                 }
2298                                 if(i != count-1){
2299                                         res += (allocSize+sizeof(infiCmiChunkHeader));
2300                                 }
2301                         }
2302                 }
2303         }       
2304 }
2305
2306 static inline void *getInfiCmiChunkThread(int dataSize){
2307         //find out to which pool this dataSize belongs to
2308         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2309         int ratio = dataSize/firstBinSize;
2310         int poolIdx=0;
2311         void *res;
2312         int i,j,nodeSize;
2313         void *pointer;
2314
2315         //printf("Hi\n");
2316         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2317         MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2318         
2319         while(ratio > 0){
2320                 ratio  = ratio >> 1;
2321                 poolIdx++;
2322         }
2323         MACHSTATE1(2,"This is %d",CmiMyRank());
2324         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2325
2326         // checking whether to analyze the free queues to reuse buffers 
2327         nodeSize = CmiMyNodeSize() + 1;
2328         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2329                 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2330                 for(i = 0; i < nodeSize; i++){
2331                         if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2332                                 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2333                                         pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2334                                         infi_CmiFreeDirect(pointer);    
2335                                 }
2336                         }
2337                 }       
2338         }
2339
2340         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2341                 infiCmiChunkMetaData *metaData;         
2342                 infiCmiChunkHeader *hdr;
2343                 int allocSize;
2344                 int count=1;
2345                 int i;
2346                 struct ibv_mr *key;
2347                 void *origres;
2348                 
2349                 
2350                 if(poolIdx < INFINUMPOOLS ){
2351                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2352                 }else{
2353                         allocSize = dataSize;
2354                 }
2355
2356                 if(poolIdx < blockThreshold){
2357                         count = blockAllocRatio;
2358                 }
2359                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2360                 _MEMCHECK(res);
2361                 hdr = res;
2362                 
2363                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2364                 CmiAssert(key != NULL);
2365 #if CMK_IBVERBS_STATS
2366                 numCurReg++;
2367                 numReg++;
2368 #endif
2369                 
2370                 origres = (res += sizeof(infiCmiChunkHeader));
2371
2372                 for(i=0;i<count;i++){
2373                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2374                         _MEMCHECK(metaData);
2375                         metaData->key = key;
2376                         metaData->owner = hdr;
2377                         metaData->poolIdx = poolIdx;
2378                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2379
2380                         if(i == 0){
2381                                 metaData->owner->metaData->count = count;
2382                                 metaData->nextBuf = NULL;
2383                         }else{
2384                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2385                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2386                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2387                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2388                                 
2389                         }
2390                         if(i != count-1){
2391                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2392                         }
2393           }     
2394                 
2395                 
2396                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2397                 
2398                 return origres;
2399         }
2400         if(poolIdx < INFINUMPOOLS){
2401                 infiCmiChunkMetaData *metaData;                         
2402         
2403                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2404                 res += sizeof(infiCmiChunkHeader);
2405
2406                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2407                 metaData = METADATAFIELD(res);
2408
2409                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2410                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2411                 
2412                 metaData->nextBuf = NULL;
2413 //              CmiAssert(metaData->poolIdx == poolIdx);
2414
2415                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2416                 return res;
2417         }
2418
2419         CmiAssert(0);
2420
2421         
2422 };
2423 #else /* not MULTIPOOL case */
2424 static inline void *getInfiCmiChunk(int dataSize){
2425         //find out to which pool this dataSize belongs to
2426         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2427         int ratio = dataSize/firstBinSize;
2428         int poolIdx=0;
2429         char *res;
2430 #if CMK_IBVERBS_STATS
2431         if(numAlloc>10000 && numAlloc%1000==0)
2432           {
2433           printf("[%d] numReg %d numUnReg %d numCurReg %d numAlloc %d numFree %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,numReg, numUnReg, numCurReg, numAlloc, numFree, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
2434           /*      printf("[%d]  numMultiSendUnreg %d numMultiSend %d  numMultiSendFree %d\n",_Cmi_mynode, numMultiSendUnreg, numMultiSend, numMultiSendFree);*/
2435           }
2436 #endif
2437         while(ratio > 0){
2438                 ratio  = ratio >> 1;
2439                 poolIdx++;
2440         }
2441         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2442         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2443                 infiCmiChunkMetaData *metaData;
2444                 infiCmiChunkHeader *hdr;
2445                 int allocSize;
2446                 int count=1;
2447                 int i;
2448                 struct ibv_mr *key;
2449                 void *origres;
2450
2451
2452                 if(poolIdx < INFINUMPOOLS ){
2453                         allocSize = infiCmiChunkPools[poolIdx].size;
2454                 }else{
2455                         allocSize = dataSize;
2456                 }
2457
2458                 if(poolIdx < blockThreshold){
2459                         count = blockAllocRatio;
2460                 }
2461                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2462                 hdr = (infiCmiChunkHeader *)res;
2463
2464                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2465                 CmiAssert(key != NULL);
2466 #if CMK_IBVERBS_STATS
2467                 numCurReg++;
2468                 numReg++;
2469 #endif
2470                 origres = (res += sizeof(infiCmiChunkHeader));
2471
2472                 for(i=0;i<count;i++){
2473                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2474                         metaData->key = key;
2475                         metaData->owner = hdr;
2476                         metaData->poolIdx = poolIdx;
2477
2478                         if(i == 0){
2479                                 metaData->owner->metaData->count = count;
2480                                 metaData->nextBuf = NULL;
2481                         }else{
2482                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2483                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2484                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2485                                 infiCmiChunkPools[poolIdx].count++;
2486
2487                         }
2488                         if(i != count-1){
2489                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2490                         }
2491           }
2492
2493
2494                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2495
2496                 return origres;
2497         }
2498         if(poolIdx < INFINUMPOOLS){
2499                 infiCmiChunkMetaData *metaData;
2500
2501                 res = infiCmiChunkPools[poolIdx].startBuf;
2502                 res += sizeof(infiCmiChunkHeader);
2503
2504                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2505                 metaData = METADATAFIELD(res);
2506
2507                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2508                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2509
2510                 metaData->nextBuf = NULL;
2511 //              CmiAssert(metaData->poolIdx == poolIdx);
2512
2513                 infiCmiChunkPools[poolIdx].count--;
2514                 return res;
2515         }
2516
2517         CmiAssert(0);
2518
2519
2520 };
2521 #endif
2522
2523
2524 void * infi_CmiAlloc(int size){
2525         char *res;
2526 #if CMK_IBVERBS_STATS
2527         numAlloc++;
2528 #endif
2529         if (Cmi_charmrun_fd == -1) return malloc(size);
2530 #if THREAD_MULTI_POOL
2531         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2532         res -= sizeof(CmiChunkHeader);
2533
2534         return res;
2535 #else
2536 #if CMK_SMP
2537         CmiMemLock();
2538 #endif
2539 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2540                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2541
2542                 res = (char*)getInfiCmiChunk(size-sizeof(CmiChunkHeader));      
2543                 res -= sizeof(CmiChunkHeader);
2544 #if CMK_SMP     
2545         CmiMemUnlock();
2546 #endif
2547 /*      }else{
2548                 res = malloc(size);
2549         }*/
2550         
2551         return res;
2552 #endif
2553 }
2554
2555 #if THREAD_MULTI_POOL
2556 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2557 void infi_CmiFreeDirect(void *ptr){
2558         int size;
2559         int parentPe;
2560         void *freePtr = ptr;
2561 #if CMK_IBVERBS_STATS
2562         numFree++;
2563 #endif
2564
2565         //ptr += sizeof(CmiChunkHeader);
2566         size = SIZEFIELD (ptr);
2567 /*      if(size > firstBinSize){*/
2568         infiCmiChunkMetaData *metaData;
2569         int poolIdx;
2570         //there is a infiniband specific header
2571         freePtr = ptr - sizeof(infiCmiChunkHeader);
2572         metaData = METADATAFIELD(ptr);
2573         poolIdx = metaData->poolIdx;
2574         infiCmiChunkPool *pool = infiCmiChunkPools[CmiMyRank()] + poolIdx;
2575         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2576 //      CmiAssert(poolIdx >= 0);
2577         if(poolIdx < INFINUMPOOLS && pool->count < INFIMAXPERPOOL &&
2578            pool->count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2579           metaData->nextBuf = pool->startBuf;
2580           pool->startBuf = freePtr;
2581           pool->count++;
2582           MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,pool->startBuf,pool->count);
2583         }else{
2584           MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2585           metaData->owner->metaData->count--;
2586           if(metaData->owner->metaData == metaData){
2587             //I am the owner
2588             if(metaData->owner->metaData->count == 0){
2589               //all the chunks have been freed
2590               int unregstat=ibv_dereg_mr(metaData->key);
2591 #if CMK_IBVERBS_STATS
2592               numUnReg++;
2593               numCurReg--;
2594 #endif
2595
2596               CmiAssert(unregstat==0);
2597               free(freePtr);
2598               free(metaData);
2599             }
2600             //if I am the owner and all the chunks have not been
2601             // freed dont free my metaData. will need later
2602           }else{
2603             if(metaData->owner->metaData->count == 0){
2604               //need to free the owner's buffer and metadata
2605               freePtr = metaData->owner;
2606               int unregstat=ibv_dereg_mr(metaData->key);
2607 #if CMK_IBVERBS_STATS
2608               numUnReg++;
2609               numCurReg--;
2610 #endif
2611
2612               CmiAssert(unregstat==0);
2613               free(metaData->owner->metaData);
2614               free(freePtr);
2615             }
2616             free(metaData);
2617           }
2618         }
2619 }
2620
2621
2622 void infi_CmiFree(void *ptr){
2623
2624         int i,j;
2625         int size;
2626         int parentPe;
2627         int nodeSize;
2628         void *pointer;
2629         void *freePtr = ptr;
2630         nodeSize = CmiMyNodeSize() + 1;
2631
2632         MACHSTATE(3,"Freeing");
2633
2634         ptr += sizeof(CmiChunkHeader);
2635         size = SIZEFIELD (ptr);
2636 /*      if(size > firstBinSize){*/
2637         infiCmiChunkMetaData *metaData;
2638         int poolIdx;
2639         //there is a infiniband specific header
2640         freePtr = ptr - sizeof(infiCmiChunkHeader);
2641         metaData = METADATAFIELD(ptr);
2642         poolIdx = metaData->poolIdx;
2643
2644         if(poolIdx == INFIMULTIPOOL){
2645                 /** this is a part of a received mult message  
2646                     it will be freed correctly later
2647                 **/
2648 #if CMK_IBVERBS_STATS
2649           numMultiSendFree++;
2650 #endif
2651           return;
2652         }
2653
2654
2655         // checking if this free operation is my responsibility
2656         parentPe = metaData->parentPe;
2657         if(parentPe != CmiMyRank()){
2658                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2659                 return;
2660         }
2661
2662
2663         infi_CmiFreeDirect(ptr);
2664
2665 }
2666
2667 #else
2668 void infi_CmiFree(void *ptr){
2669         int size;
2670         void *freePtr = ptr;
2671 #if CMK_IBVERBS_STATS
2672         numFree++;
2673 #endif
2674         
2675         if (Cmi_charmrun_fd == -1) return free(ptr);
2676 #if CMK_SMP     
2677         CmiMemLock();
2678 #endif
2679         ptr += sizeof(CmiChunkHeader);
2680         size = SIZEFIELD (ptr);
2681 /*      if(size > firstBinSize){*/
2682                 infiCmiChunkMetaData *metaData;
2683                 int poolIdx;
2684                 //there is a infiniband specific header
2685                 freePtr = (char*)ptr - sizeof(infiCmiChunkHeader);
2686                 metaData = METADATAFIELD(ptr);
2687                 poolIdx = metaData->poolIdx;
2688                 if(poolIdx == INFIMULTIPOOL){
2689                         /** this is a part of a received mult message  
2690                         it will be freed correctly later
2691                         **/
2692 #if CMK_IBVERBS_STATS
2693                         numMultiSendFree++;
2694 #endif
2695                         return;
2696                 }
2697                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2698 //              CmiAssert(poolIdx >= 0);
2699                 if(poolIdx < INFINUMPOOLS &&
2700                    infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL &&
2701                    infiCmiChunkPools[poolIdx].count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2702                   metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2703                   infiCmiChunkPools[poolIdx].startBuf = freePtr;
2704                   infiCmiChunkPools[poolIdx].count++;
2705                         
2706                   MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2707                 }else{
2708                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2709                         metaData->owner->metaData->count--;
2710                         if(metaData->owner->metaData == metaData){
2711                                 //I am the owner
2712                                 if(metaData->owner->metaData->count == 0){
2713                                         //all the chunks have been freed
2714                                         int unregstat=ibv_dereg_mr(metaData->key);
2715 #if CMK_IBVERBS_STATS
2716                                         numUnReg++;
2717                                         numCurReg--;
2718 #endif
2719
2720                                         CmiAssert(unregstat==0);
2721                                         free(freePtr);
2722                                         free(metaData);
2723                                 }
2724                                 //if I am the owner and all the chunks have not been
2725                                 // freed dont free my metaData. will need later
2726                         }else{
2727                                 if(metaData->owner->metaData->count == 0){
2728                                         //need to free the owner's buffer and metadata
2729                                         freePtr = metaData->owner;
2730                                         int unregstat=ibv_dereg_mr(metaData->key);
2731 #if CMK_IBVERBS_STATS
2732                                         numUnReg++;
2733                                         numCurReg--;
2734 #endif
2735
2736                                         CmiAssert(unregstat==0);
2737                                         free(metaData->owner->metaData);
2738                                         free(freePtr);
2739                                 }
2740                                 free(metaData);
2741                         }
2742                 }       
2743 #if CMK_SMP     
2744         CmiMemUnlock();
2745 #endif
2746 /*      }else{
2747                 free(freePtr);
2748         }*/
2749 }
2750 #endif
2751
2752 /*********************************************************************************************
2753 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2754 the user can transfer data between processors without using Charm++ messages. This lets the user
2755 send and receive data from the middle of his arrays without any copying on either send or receive
2756 side
2757 *********************************************************************************************/
2758
2759 struct infiDirectRequestPacket{
2760         int senderProc;
2761         int handle;
2762         struct ibv_mr senderKey;
2763         void *senderBuf;
2764         int senderBufSize;
2765 };
2766
2767 #include "cmidirect.h"
2768
2769 #define MAXHANDLES 512
2770
2771 struct infiDirectHandleStruct;
2772
2773
2774 typedef struct directPollingQNodeStruct {
2775         struct infiDirectHandleStruct *handle;
2776         struct directPollingQNodeStruct *next;
2777         double *lastDouble;
2778 } directPollingQNode;
2779
2780 typedef struct infiDirectHandleStruct{
2781         int id;
2782         void *buf;
2783         int size;
2784         struct ibv_mr *key;
2785         void (*callbackFnPtr)(void *);
2786         void *callbackData;
2787 //      struct infiDirectRequestPacket *packet;
2788         struct infiDirectUserHandle userHandle;
2789         struct infiRdmaPacket *rdmaPacket;
2790         directPollingQNode pollingQNode;
2791 }       infiDirectHandle;
2792
2793 typedef struct infiDirectHandleTableStruct{
2794         infiDirectHandle handles[MAXHANDLES];
2795         struct infiDirectHandleTableStruct *next;
2796 } infiDirectHandleTable;
2797
2798
2799 // data structures 
2800
2801 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2802
2803 static infiDirectHandleTable **sendHandleTable=NULL;
2804 static infiDirectHandleTable **recvHandleTable=NULL;
2805
2806 static int *recvHandleCount=NULL;
2807
2808 void addHandleToPollingQ(infiDirectHandle *handle){
2809 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2810         directPollingQNode *newNode = &(handle->pollingQNode);
2811         newNode->handle = handle;
2812         newNode->next = NULL;
2813         if(headDirectPollingQ==NULL){
2814                 /*empty pollingQ*/
2815                 headDirectPollingQ = newNode;
2816                 tailDirectPollingQ = newNode;
2817         }else{
2818                 tailDirectPollingQ->next = newNode;
2819                 tailDirectPollingQ = newNode;
2820         }
2821 };
2822 /*
2823 infiDirectHandle *removeHandleFromPollingQ(){
2824         if(headDirectPollingQ == NULL){
2825                 //polling Q is empty
2826                 return NULL;
2827         }
2828         directPollingQNode *retNode = headDirectPollingQ;
2829         if(headDirectPollingQ == tailDirectPollingQ){
2830                 //PollingQ has one node 
2831                 headDirectPollingQ = tailDirectPollingQ = NULL;
2832         }else{
2833                 headDirectPollingQ = headDirectPollingQ->next;
2834         }
2835         infiDirectHandle *retHandle = retNode->handle;
2836         free(retNode);
2837         return retHandle;
2838 }*/
2839
2840 static inline infiDirectHandleTable **createHandleTable(){
2841         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2842         int i;
2843         for(i=0;i<_Cmi_numnodes;i++){
2844                 table[i] = NULL;
2845         }
2846         return table;
2847 }
2848
2849 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2850         *tableIdx = handle/MAXHANDLES;
2851         *idx = handle%MAXHANDLES;
2852 };
2853
2854 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2855 {
2856         /** initialize the last double in the buffer to bufize***/
2857         int index = recvBufSize - sizeof(double);
2858         double *lastDouble = (double *)(((char *)recvBuf)+index);
2859         *lastDouble = initialValue;
2860 }
2861
2862
2863 /**
2864  To be called on the receiver to create a handle and return its number
2865 **/
2866 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2867         int newHandle;
2868         int tableIdx,idx;
2869         int i;
2870         infiDirectHandleTable *table;
2871         struct infiDirectUserHandle userHandle;
2872         
2873         CmiAssert(recvBufSize > sizeof(double));
2874         
2875         if(recvHandleTable == NULL){
2876                 recvHandleTable = createHandleTable();
2877                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2878                 for(i=0;i<_Cmi_numnodes;i++){
2879                         recvHandleCount[i] = -1;
2880                 }
2881         }
2882         if(recvHandleTable[senderNode] == NULL){
2883                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2884                 recvHandleTable[senderNode]->next = NULL;               
2885         }
2886         
2887         newHandle = ++recvHandleCount[senderNode];
2888         CmiAssert(newHandle >= 0);
2889         
2890         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2891         
2892         table = recvHandleTable[senderNode];
2893         for(i=0;i<tableIdx;i++){
2894                 if(table->next ==NULL){
2895                         table->next = malloc(sizeof(infiDirectHandleTable));
2896                         table->next->next = NULL;
2897                 }
2898                 table = table->next;
2899         }
2900         table->handles[idx].id = newHandle;
2901         table->handles[idx].buf = recvBuf;
2902         table->handles[idx].size = recvBufSize;
2903 #if CMI_DIRECT_DEBUG
2904         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2905 #endif
2906         table->handles[idx].callbackFnPtr = callbackFnPtr;
2907         table->handles[idx].callbackData = callbackData;
2908         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2909         CmiAssert(table->handles[idx].key != NULL);
2910 #if CMK_IBVERBS_STATS
2911         numCurReg++;
2912         numReg++;
2913 #endif
2914 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2915         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2916         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2917         
2918         userHandle.handle = newHandle;
2919         userHandle.recverNode = _Cmi_mynode;
2920         userHandle.senderNode = senderNode;
2921         userHandle.recverBuf = recvBuf;
2922         userHandle.recverBufSize = recvBufSize;
2923         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2924         userHandle.initialValue = initialValue;
2925         
2926         table->handles[idx].userHandle = userHandle;
2927         
2928         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2929
2930   {
2931          int index = table->handles[idx].size - sizeof(double);
2932    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2933         } 
2934         
2935         addHandleToPollingQ(&(table->handles[idx]));
2936         
2937 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2938         
2939         return userHandle;
2940 }
2941
2942 /****
2943  To be called on the sender to attach the sender's buffer to this handle
2944 ******/
2945 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2946         int tableIdx,idx;
2947         int i;
2948         int handle = userHandle->handle;
2949         int recverNode  = userHandle->recverNode;
2950
2951         infiDirectHandleTable *table;
2952
2953         if(sendHandleTable == NULL){
2954                 sendHandleTable = createHandleTable();
2955         }
2956         if(sendHandleTable[recverNode] == NULL){
2957                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2958                 sendHandleTable[recverNode]->next = NULL;
2959         }
2960         
2961         CmiAssert(handle >= 0);
2962         calcHandleTableIdx(handle,&tableIdx,&idx);
2963         
2964         table = sendHandleTable[recverNode];
2965         for(i=0;i<tableIdx;i++){
2966                 if(table->next ==NULL){
2967                         table->next = malloc(sizeof(infiDirectHandleTable));
2968                         table->next->next = NULL;
2969                 }
2970                 table = table->next;
2971         }
2972
2973         table->handles[idx].id = handle;
2974         table->handles[idx].buf = sendBuf;
2975
2976         table->handles[idx].size = sendBufSize;
2977 #if CMI_DIRECT_DEBUG
2978         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2979 #endif
2980         table->handles[idx].callbackFnPtr = NULL;
2981         table->handles[idx].callbackData = NULL;
2982         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2983         CmiAssert(table->handles[idx].key != NULL);
2984 #if CMK_IBVERBS_STATS
2985         numCurReg++;
2986         numReg++;
2987 #endif
2988         table->handles[idx].userHandle = *userHandle;
2989         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2990         
2991         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2992         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2993         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2994         
2995         
2996 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2997         table->handles[idx].packet->senderProc = _Cmi_mynode;
2998         table->handles[idx].packet->handle = handle;
2999         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
3000         table->handles[idx].packet->senderBuf = sendBuf;
3001         table->handles[idx].packet->senderBufSize = sendBufSize;*/
3002         
3003         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
3004 };
3005
3006
3007
3008
3009
3010 /****
3011 To be called on the sender to do the actual data transfer
3012 ******/
3013 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
3014         int handle = userHandle->handle;
3015         int recverNode = userHandle->recverNode;
3016         if(recverNode == _Cmi_mynode){
3017                 /*when the sender and receiver are on the same
3018                 processor, just look up the sender and receiver
3019                 buffers and do a memcpy*/
3020
3021                 infiDirectHandleTable *senderTable;
3022                 infiDirectHandleTable *recverTable;
3023                 
3024                 int tableIdx,idx,i;
3025
3026                 
3027                 /*find entry for this handle in sender table*/
3028                 calcHandleTableIdx(handle,&tableIdx,&idx);
3029                 CmiAssert(sendHandleTable!= NULL);
3030                 senderTable = sendHandleTable[_Cmi_mynode];
3031                 CmiAssert(senderTable != NULL);
3032                 for(i=0;i<tableIdx;i++){
3033                         senderTable = senderTable->next;
3034                 }
3035
3036                 /**find entry for this handle in recver table*/
3037                 recverTable = recvHandleTable[recverNode];
3038                 CmiAssert(recverTable != NULL);
3039                 for(i=0;i< tableIdx;i++){
3040                         recverTable = recverTable->next;
3041                 }
3042                 
3043                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
3044                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
3045 #if CMI_DIRECT_DEBUG
3046                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
3047 #endif
3048                 // The polling Q should find you and handle the callback and pollingq entry
3049                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
3050                 
3051
3052         }else{
3053                 infiPacket packet;
3054                 int tableIdx,idx;
3055                 int i;
3056                 OtherNode node;
3057                 infiDirectHandleTable *table;
3058
3059                 calcHandleTableIdx(handle,&tableIdx,&idx);
3060
3061                 table = sendHandleTable[recverNode];
3062                 CmiAssert(table != NULL);
3063                 for(i=0;i<tableIdx;i++){
3064                         table = table->next;
3065                 }
3066
3067 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
3068 #if CMI_DIRECT_DEBUG
3069                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
3070 #endif
3071
3072                 
3073                 {
3074                         
3075                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
3076                         struct ibv_sge list = {
3077                                 .addr = (uintptr_t )table->handles[idx].buf,
3078                                 .length = table->handles[idx].size,
3079                                 .lkey   = table->handles[idx].key->lkey
3080                         };
3081                         
3082                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
3083
3084                         struct ibv_send_wr *bad_wr;
3085                         struct ibv_send_wr wr = {
3086                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3087                                 .sg_list = &list,
3088                                 .num_sge = 1,
3089                                 .opcode = IBV_WR_RDMA_WRITE,
3090                                 .send_flags = IBV_SEND_SIGNALED,
3091                                 
3092                                 .wr.rdma = {
3093                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
3094                                         .rkey = remoteKey->rkey
3095                                 }
3096                         };
3097                         /** post and rdma_read that is a rdma get*/
3098                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3099                                 CmiAssert(0);
3100                         }
3101                 }
3102
3103         /*      MallocInfiPacket (packet);
3104                 {
3105                         packet->size = sizeof(struct infiDirectRequestPacket);
3106                         packet->buf = (char *)(table->handles[idx].packet);
3107                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3108                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3109                 }*/
3110         }
3111
3112 };
3113
3114 /**** need not be called the first time *********/
3115 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3116   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3117 }
3118
3119 /**** need not be called the first time *********/
3120 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3121         int handle = userHandle->handle;
3122         int tableIdx,idx,i;
3123         infiDirectHandleTable *table;
3124         calcHandleTableIdx(handle,&tableIdx,&idx);
3125         
3126         table = recvHandleTable[userHandle->senderNode];
3127         CmiAssert(table != NULL);
3128         for(i=0;i<tableIdx;i++){
3129                 table = table->next;
3130         }
3131 #if CMI_DIRECT_DEBUG
3132   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3133 #endif  
3134         addHandleToPollingQ(&(table->handles[idx]));
3135         
3136
3137 }
3138
3139 /**** need not be called the first time *********/
3140 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3141         int handle = userHandle->handle;
3142         int tableIdx,idx,i;
3143         infiDirectHandleTable *table;
3144         
3145         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3146
3147         calcHandleTableIdx(handle,&tableIdx,&idx);
3148         
3149         table = recvHandleTable[userHandle->senderNode];
3150         CmiAssert(table != NULL);
3151         for(i=0;i<tableIdx;i++){
3152                 table = table->next;
3153         }
3154 #if CMI_DIRECT_DEBUG
3155   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
3156 #endif  
3157         addHandleToPollingQ(&(table->handles[idx]));
3158         
3159 }
3160
3161
3162 static int receivedDirectMessage(infiDirectHandle *handle){
3163 //      int index = handle->size - sizeof(double);
3164 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
3165         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3166                 return 0;
3167         }else{
3168                 (*(handle->callbackFnPtr))(handle->callbackData);       
3169                 return 1;
3170         }
3171         
3172 }
3173
3174
3175 static void pollCmiDirectQ(){
3176         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3177         while(ptr != NULL){
3178                 if(receivedDirectMessage(ptr->handle)){
3179 #if CMI_DIRECT_DEBUG
3180       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
3181 #endif
3182                         directPollingQNode *delPtr = ptr;
3183                         /** has been received and delete this node***/
3184                         if(prevPtr == NULL){
3185                                 /** first in the pollingQ**/
3186                                 if(headDirectPollingQ == tailDirectPollingQ){
3187                                         /**only node in pollingQ****/
3188                                         headDirectPollingQ = tailDirectPollingQ = NULL;
3189                                 }else{
3190                                         headDirectPollingQ = headDirectPollingQ->next;
3191                                 }
3192                         }else{
3193                                 if(ptr == tailDirectPollingQ){
3194                                         /**last node is being deleted**/
3195                                         tailDirectPollingQ = prevPtr;
3196                                 }
3197                                 prevPtr->next = ptr->next;
3198                         }
3199                         ptr = ptr->next;
3200                 //      free(delPtr);
3201                 }else{
3202                         prevPtr = ptr;
3203                         ptr = ptr->next;
3204                 }
3205         }
3206 }
3207
3208
3209 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3210         int senderProc = directRequestPacket->senderProc;
3211         int handle = directRequestPacket->handle;
3212         int tableIdx,idx,i;
3213         infiDirectHandleTable *table;
3214         OtherNode node = nodes_by_pe[senderProc];
3215
3216         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3217
3218         calcHandleTableIdx(handle,&tableIdx,&idx);
3219
3220         table = recvHandleTable[senderProc];
3221         CmiAssert(table != NULL);
3222         for(i=0;i<tableIdx;i++){
3223                 table = table->next;
3224         }
3225         
3226         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3227         
3228         {
3229                 struct ibv_sge list = {
3230                         .addr = (uintptr_t )table->handles[idx].buf,
3231                         .length = table->handles[idx].size,
3232                         .lkey   = table->handles[idx].key->lkey
3233                 };
3234
3235                 struct ibv_send_wr *bad_wr;
3236                 struct ibv_send_wr wr = {
3237                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3238                         .sg_list = &list,
3239                         .num_sge = 1,
3240                         .opcode = IBV_WR_RDMA_READ,
3241                         .send_flags = IBV_SEND_SIGNALED,
3242                         .wr.rdma = {
3243                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3244                                 .rkey = directRequestPacket->senderKey.rkey
3245                         }
3246                 };
3247 //       post and rdma_read that is a rdma get
3248                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3249                         CmiAssert(0);
3250                 }
3251         }
3252                         
3253         
3254 };*/
3255 /*
3256 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3257         MACHSTATE(3,"processDirectWC");
3258         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3259         (*(handle->callbackFnPtr))(handle->callbackData);
3260 };
3261 */
3262
3263 #if 0
3264
3265 // use the common one
3266
3267 static void sendBarrierMessage(int pe)
3268 {
3269   /* we will only need one packet */
3270   int size=32;
3271   OtherNode  node = nodes + pe;
3272   infiPacket packet;
3273   MallocInfiPacket(packet);
3274   packet->size = size;
3275   packet->buf = CmiAlloc(size);
3276   packet->header.code = INFIBARRIERPACKET;
3277   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3278   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3279   /*  pollSendCq(0);*/