Functionality added into CmiInitiCommunication(...) function.
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * Ibverbs (infiniband)  implementation of Converse NET version
4  * @ingroup NET
5  * contains only Ibverbs specific code for:
6  * - CmiMachineInit()
7  * - CmiCommunicationInit()
8  * - CmiNotifyStillIdle()
9  * - DeliverViaNetwork()
10  * - CommunicationServer()
11  * - CmiMachineExit()
12
13   created by 
14         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
15 */
16
17 /**
18  * @addtogroup NET
19  * @{
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <malloc.h>
28 #include <getopt.h>
29 #include <time.h>
30
31 #include <infiniband/verbs.h>
32
33 enum ibv_mtu mtu = IBV_MTU_2048;
34 static int page_size;
35 static int mtu_size;
36 static int packetSize;
37 static int dataSize;
38 static int rdma;
39 static int rdmaThreshold;
40 static int firstBinSize;
41 static int blockAllocRatio;
42 static int blockThreshold;
43
44
45 static int maxRecvBuffers;
46 static int maxTokens;
47 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
48 static int sendPacketPoolSize; /*total number of send buffers created*/
49
50 static double _startTime=0;
51 static int regCount;
52
53 static int pktCount;
54 static int msgCount;
55 static int minTokensLeft;
56
57
58 static double regTime;
59
60 static double processBufferedTime;
61 static int processBufferedCount;
62
63 #define CMK_IBVERBS_STATS 0
64 #define CMK_IBVERBS_TOKENS_FLOW 1
65 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
66 #define CMK_IBVERBS_DEBUG 0
67 #define CMI_DIRECT_DEBUG 0
68 #define WC_LIST_SIZE 32
69 /*#define WC_BUFFER_SIZE 100*/
70
71
72
73
74 #define INCTOKENS_FRACTION 0.04
75 #define INCTOKENS_INCREASE .50
76
77 // flag for using a pool for every thread
78 #define THREAD_MULTI_POOL 0
79
80 #if THREAD_MULTI_POOL 
81 #include "pcqueue.h"
82 PCQueue **queuePool;
83 #endif
84
85 #define INFIBARRIERPACKET 128
86
87 struct infiIncTokenAckPacket{
88         int a;
89 };
90
91 typedef struct {
92 char none;  
93 } CmiIdleState;
94
95 //TODO:ERASE this
96 static int TESTneighbor;
97 static int TESTfrees;
98
99
100 /********
101 ** The notify idle methods
102 ***/
103
104 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
105
106 static void CmiNotifyStillIdle(CmiIdleState *s);
107
108
109 static void CmiNotifyBeginIdle(CmiIdleState *s)
110 {
111   CmiNotifyStillIdle(s);
112 }
113
114 void CmiNotifyIdle(void) {
115   CmiNotifyStillIdle(NULL);
116 }
117
118 /***************
119 Data Structures 
120 ***********************/
121
122 /******
123         This is a header attached to the beginning of every infiniband packet
124 *******/
125 #define INFIPACKETCODE_DATA 1
126 #define INFIPACKETCODE_INCTOKENS 2
127 #define INFIRDMA_START 4
128 #define INFIRDMA_ACK 8
129 #define INFIDIRECT_REQUEST 16
130 #define INFIPACKETCODE_INCTOKENSACK 32
131 #define INFIDUMMYPACKET 64
132
133 struct infiPacketHeader{
134         char code;
135         int nodeNo;
136 #if     CMK_IBVERBS_DEBUG
137         int psn;
138 #endif  
139 };
140
141 /*
142         Types of rdma packets
143 */
144 #define INFI_MESG 1 
145 #define INFI_DIRECT 2
146
147 struct infiRdmaPacket{
148         int fromNodeNo;
149         int type;
150         struct ibv_mr key;
151         struct ibv_mr *keyPtr;
152         int remoteSize;
153         char *remoteBuf;
154         void *localBuffer;
155         OutgoingMsg ogm;
156         struct infiRdmaPacket *next,*prev;
157 };
158
159
160 /** Represents a buffer that is used to receive messages
161 */
162 #define BUFFER_RECV 1
163 #define BUFFER_RDMA 2
164 struct infiBuffer{
165         int type;
166         char *buf;
167         int size;
168         struct ibv_mr *key;
169 };
170
171
172
173 /** At the moment it is a simple pool with just a list of buffers
174         * TODO; extend it to make it an element in a linklist of pools
175 */
176 struct infiBufferPool{
177         int numBuffers;
178         struct infiBuffer *buffers;
179         struct infiBufferPool *next;
180 };
181
182 /*****
183         It is the structure for the send buffers that are used
184         to send messages to other nodes
185 ********/
186
187
188 typedef struct infiPacketStruct {       
189         char *buf;
190         int size;
191         struct infiPacketHeader header;
192         struct ibv_mr *keyHeader;
193         struct OtherNodeStruct *destNode;
194         struct infiPacketStruct *next;
195         OutgoingMsg ogm;
196         struct ibv_sge elemList[2];
197         struct ibv_send_wr wr;
198 }* infiPacket;
199
200 /*
201 typedef struct infiBufferedWCStruct{
202         struct ibv_wc wcList[WC_BUFFER_SIZE];
203         int count;
204         struct infiBufferedWCStruct *next,*prev;
205 } * infiBufferedWC;
206 */
207
208 #define BCASTLIST_SIZE 50
209
210 struct infiBufferedBcastStruct{
211         char *msg;
212         int size;
213         int broot;
214         int asm_rank;
215         int valid;
216 };
217
218 typedef struct infiBufferedBcastPoolStruct{
219         struct infiBufferedBcastPoolStruct *next,*prev;
220         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
221         int count;
222
223 } *infiBufferedBcastPool;
224
225
226
227
228 /***
229         This structure represents the data needed by the infiniband
230         communication routines of a node
231         TODO: add locking for the smp version
232 */
233 struct infiContext {
234         struct ibv_context      *context;
235         
236         fd_set  asyncFds;
237         struct timeval tmo;
238         
239         int ibPort;
240 //      struct ibv_comp_channel *channel;
241         struct ibv_pd           *pd;
242         struct ibv_cq           *sendCq;
243         struct ibv_cq   *recvCq;
244         struct ibv_srq  *srq;
245         
246         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
247                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
248                                                                                                 //when the qps are stored in the corresponding OtherNodes
249
250         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
251
252         infiPacket infiPacketFreeList; 
253         
254         struct infiBufferPool *recvBufferPool;
255
256         struct infiPacketHeader header;
257
258         int srqSize;
259         int sendCqSize,recvCqSize;
260         int tokensLeft;
261
262         infiBufferedBcastPool bufferedBcastList;
263         
264         struct infiRdmaPacket *bufferedRdmaAcks;
265         
266         struct infiRdmaPacket *bufferedRdmaRequests;
267 /*      infiBufferedWC infiBufferedRecvList;*/
268         
269         int insideProcessBufferedBcasts;
270 };
271
272 static struct infiContext *context;
273
274
275
276
277 /** Represents a qp used to send messages to another node
278  There is one for each remote node
279 */
280 struct infiAddr {
281         int lid,qpn,psn;
282 };
283
284 /**
285  Stored in the OtherNode structure in machine-dgram.c 
286  Store the per node data for ibverbs layer
287 */
288 enum { INFI_HEADER_DATA=21,INFI_DATA};
289
290 struct infiOtherNodeData{
291         struct ibv_qp *qp ;
292         int state;// does it expect a packet with a header (first packet) or one without
293         int totalTokens;
294         int tokensLeft;
295         int nodeNo;
296         
297         int postedRecvs;
298         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
299 #if     CMK_IBVERBS_DEBUG       
300         int psn;
301         int recvPsn;
302 #endif  
303 };
304
305
306 /********************************
307 Memory management structures and types
308 *****************/
309
310 struct infiCmiChunkHeaderStruct;
311
312 typedef struct infiCmiChunkMetaDataStruct {
313         struct ibv_mr *key;
314         int poolIdx;
315         void *nextBuf;
316         struct infiCmiChunkHeaderStruct *owner;
317         int count;
318
319 #if THREAD_MULTI_POOL
320         int parentPe;                                           // the PE that allocated the buffer and must release it
321 #endif
322 } infiCmiChunkMetaData;
323
324
325
326
327 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
328
329 typedef struct {
330         int size;//without infiCmiChunkHeader
331         void *startBuf;
332         int count;
333 } infiCmiChunkPool;
334
335 #define INFINUMPOOLS 20
336 #define INFIMAXPERPOOL 100
337 #define INFIMULTIPOOL -5
338
339 #if THREAD_MULTI_POOL
340 infiCmiChunkPool **infiCmiChunkPools;
341 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
342 #else
343 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
344 #endif
345
346 static void initInfiCmiChunkPools();
347
348
349 static inline infiPacket newPacket(){
350         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
351         pkt->size = -1;
352         pkt->header = context->header;
353         pkt->next = NULL;
354         pkt->destNode = NULL;
355         pkt->keyHeader = METADATAFIELD(pkt)->key;
356         pkt->ogm=NULL;
357         CmiAssert(pkt->keyHeader!=NULL);
358         
359         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
360         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
361         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
362         
363         pkt->wr.wr_id = (uint64_t)pkt;
364         pkt->wr.sg_list = &(pkt->elemList[0]);
365         pkt->wr.num_sge = 2;
366         pkt->wr.opcode = IBV_WR_SEND;
367         pkt->wr.send_flags = IBV_SEND_SIGNALED;
368         pkt->wr.next = NULL;
369         
370         return pkt;
371 };
372
373 #define FreeInfiPacket(pkt){ \
374         pkt->size = -1;\
375         pkt->ogm=NULL;\
376         pkt->next = context->infiPacketFreeList; \
377         context->infiPacketFreeList = pkt; \
378 }
379
380 #define MallocInfiPacket(pkt) { \
381         infiPacket p = context->infiPacketFreeList; \
382         if(p == NULL){ p = newPacket();} \
383                  else{context->infiPacketFreeList = p->next; } \
384         pkt = p;\
385 }
386
387
388
389
390 /******************CmiMachineInit and its helper functions*/
391 static inline int pollSendCq(const int toBuffer);
392
393 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
394 static uint16_t getLocalLid(struct ibv_context *context, int port);
395 static int  checkQp(struct ibv_qp *qp){
396         struct ibv_qp_attr attr;
397         struct ibv_qp_init_attr init_attr;
398                  
399         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
400         if(attr.cur_qp_state != IBV_QPS_RTS){
401                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
402                 return 0;
403         }
404         return 1;
405 }
406 static void checkAllQps(){
407         int i;
408         for(i=0;i<_Cmi_numnodes;i++){
409                 if(i != _Cmi_mynode){
410                         if(!checkQp(nodes[i].infiData->qp)){
411                                 pollSendCq(0);
412                                 CmiAssert(0);
413                         }
414                 }
415         }
416 }
417
418 #if CMK_IBVERBS_FAST_START
419 static void send_partial_init();
420 #endif
421
422 static void CmiMachineInit(char **argv){
423         struct ibv_device **devList;
424         struct ibv_device *dev;
425         int ibPort;
426         int i;
427         int calcMaxSize;
428         infiPacket *pktPtrs;
429         struct infiRdmaPacket **rdmaPktPtrs;
430
431         MACHSTATE(3,"CmiMachineInit {");
432         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
433         MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
434         
435         //TODO: make the device and ibport configureable by commandline parameter
436         //Check example for how to do that
437         devList =  ibv_get_device_list(NULL);
438         CmiAssert(devList != NULL);
439
440         dev = *devList;
441         CmiAssert(dev != NULL);
442
443         ibPort=1;
444
445         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
446
447         context = (struct infiContext *)malloc(sizeof(struct infiContext));
448         
449         MACHSTATE1(3,"context allocated %p",context);
450         
451         //localAddr will store the local addresses of all the qps
452         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
453         
454         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
455         
456         context->ibPort = ibPort;
457         //the context for this infiniband device 
458         context->context = ibv_open_device(dev);
459         CmiAssert(context->context != NULL);
460         
461         MACHSTATE1(3,"device opened %p",context->context);
462
463 /*      FD_ZERO(&context->asyncFds);
464         FD_SET(context->context->async_fd,&context->asyncFds);
465         context->tmo.tv_sec=0;
466         context->tmo.tv_usec=0;
467         
468         MACHSTATE(3,"asyncFds zeroed and set");*/
469
470         //protection domain
471         context->pd = ibv_alloc_pd(context->context);
472         CmiAssert(context->pd != NULL);
473         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
474
475   /******** At this point we know that this node is more or less serviceable
476         So, this is a good point for sending the partial init message for the fast
477         start case
478         Moreover, no work dependent on the number of nodes has started yet.
479         ************/
480
481 #if CMK_IBVERBS_FAST_START
482   send_partial_init();
483 #endif
484
485
486         context->header.nodeNo = _Cmi_mynode;
487
488         mtu_size=1200;
489         packetSize = mtu_size*4;
490         dataSize = packetSize-sizeof(struct infiPacketHeader);
491         
492         calcMaxSize=8000;
493 /*      if(_Cmi_numnodes*50 > calcMaxSize){
494                 calcMaxSize = _Cmi_numnodes*50;
495                 if(calcMaxSize > 10000){
496                         calcMaxSize = 10000;
497                 }
498         }*/
499 //      maxRecvBuffers=80;
500         maxRecvBuffers=calcMaxSize;
501         maxTokens = maxRecvBuffers;
502 //      maxTokens = 80;
503         context->tokensLeft=maxTokens;
504         //tokensPerProcessor=4;
505         if(_Cmi_numnodes > 1){
506                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
507         }
508         
509         
510         //TURN ON RDMA
511         rdma=1;
512 //      rdmaThreshold=32768;
513         rdmaThreshold=22000;
514         firstBinSize = 120;
515         CmiAssert(rdmaThreshold > firstBinSize);
516         blockAllocRatio=16;
517         blockThreshold=8;
518
519         //TODO:erase this
520         int ppn = 0;
521         CmiGetArgInt(argv,"+ppn",&CmiMyNodeSize());
522         MACHSTATE1(3,"CmiMyNodeSize %d",CmiMyNodeSize());
523         
524 #if !THREAD_MULTI_POOL
525         initInfiCmiChunkPools();
526 #endif
527
528         /*create the pool of send packets*/
529         sendPacketPoolSize = maxTokens/2;       
530         if(sendPacketPoolSize > 2000){
531                 sendPacketPoolSize = 2000;
532         }
533         
534         context->infiPacketFreeList=NULL;
535         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
536
537         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
538 #if !THREAD_MULTI_POOL
539         for(i=0;i<sendPacketPoolSize;i++){
540                 MallocInfiPacket(pktPtrs[i]);   
541         }
542
543         for(i=0;i<sendPacketPoolSize;i++){
544                 FreeInfiPacket(pktPtrs[i]);     
545         }
546         free(pktPtrs);
547 #endif
548         
549         context->bufferedBcastList=NULL;
550         context->bufferedRdmaAcks = NULL;
551         context->bufferedRdmaRequests = NULL;
552         context->insideProcessBufferedBcasts=0;
553         
554         
555         if(rdma){
556 /*              int numPkts;
557                 int k;
558                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
559                         numPkts = _Cmi_numnodes*4;
560                 }else{
561                         numPkts = maxRecvBuffers/4;
562                 }
563                 
564                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
565                 for(k=0;k<numPkts;k++){
566                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
567                 }
568                 
569                 for(k=0;k<numPkts;k++){
570                         CmiFree(rdmaPktPtrs[k]);
571                 }
572                 free(rdmaPktPtrs);*/
573         }
574         
575 /*      context->infiBufferedRecvList = NULL;*/
576 #if CMK_IBVERBS_STATS   
577         regCount =0;
578         regTime  = 0;
579
580         pktCount=0;
581         msgCount=0;
582
583         processBufferedCount=0;
584         processBufferedTime=0;
585
586         minTokensLeft = maxTokens;
587 #endif  
588
589         
590
591         MACHSTATE(3,"} CmiMachineInit");
592 }
593
594 void CmiCommunicationInit(char **argv)
595 {
596 #if THREAD_MULTI_POOL
597         initInfiCmiChunkPools();
598 #endif
599 }
600
601 /*********
602         Open a qp for every processor
603 *****/
604 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
605         int myLid;
606         int i;
607         
608         
609         //find my lid
610         myLid = getLocalLid(context->context,ibPort);
611         
612         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
613
614         context->sendCqSize = maxTokens+2;
615         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
616         CmiAssert(context->sendCq != NULL);
617         
618         MACHSTATE1(3,"sendCq created %p",context->sendCq);
619         
620         
621         context->recvCqSize = maxRecvBuffers;
622         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
623         
624         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
625         CmiAssert(context->recvCq != NULL);
626         
627         //array of queue pairs
628
629         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
630
631         if(numNodes > 1)
632         {
633                 context->srqSize = (maxRecvBuffers+2);
634                 struct ibv_srq_init_attr srqAttr = {
635                         .attr = {
636                         .max_wr  = context->srqSize,
637                         .max_sge = 1
638                         }
639                 };
640                 context->srq = ibv_create_srq(context->pd,&srqAttr);
641                 CmiAssert(context->srq != NULL);
642         
643                 struct ibv_qp_init_attr initAttr = {
644                         .qp_type = IBV_QPT_RC,
645                         .send_cq = context->sendCq,
646                         .recv_cq = context->recvCq,
647                         .srq             = context->srq,
648                         .sq_sig_all = 0,
649                         .qp_context = NULL,
650                         .cap     = {
651                                 .max_send_wr  = maxTokens,
652                                 .max_send_sge = 2,
653                         },
654                 };
655                 struct ibv_qp_attr attr;
656
657                 attr.qp_state        = IBV_QPS_INIT;
658                 attr.pkey_index      = 0;
659                 attr.port_num        = ibPort;
660                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
661
662 /*              MACHSTATE1(3,"context->pd %p",context->pd);
663                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
664                 MACHSTATE1(3,"TEST QP %p",qp);*/
665
666                 for( i=0;i<numNodes;i++){
667                         if(i == myNode){
668                         }else{
669                                 localAddr[i].lid = myLid;
670                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
671                         
672                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
673                         
674                                 CmiAssert(context->qp[i] != NULL);
675                         
676                         
677                                 ibv_modify_qp(context->qp[i], &attr,
678                                           IBV_QP_STATE              |
679                                           IBV_QP_PKEY_INDEX         |
680                                         IBV_QP_PORT               |
681                                         IBV_QP_ACCESS_FLAGS);           
682
683                                 localAddr[i].qpn = context->qp[i]->qp_num;
684                                 localAddr[i].psn = lrand48() & 0xffffff;
685                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
686                         }
687                 }
688         }
689         MACHSTATE(3,"qps created");
690 };
691
692 void copyInfiAddr(ChInfiAddr *qpList){
693         int qpListIdx=0;
694         int i;
695         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
696         for(i=0;i<_Cmi_numnodes;i++){
697                 if(i == _Cmi_mynode){
698                 }else{
699                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
700                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
701                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
702                         qpListIdx++;
703                 }
704         }
705 }
706
707
708 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
709         struct ibv_port_attr attr;
710
711         if (ibv_query_port(dev_context, port, &attr))
712                 return 0;
713
714         return attr.lid;
715 }
716
717 /**************** END OF CmiMachineInit and its helper functions*/
718
719 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
720 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
721
722 /* Initial the infiniband specific data for a remote node
723         1. connect the qp and store it in and return it
724 **/
725 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
726         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
727         int err;
728         ret->state = INFI_HEADER_DATA;
729         ret->qp = context->qp[node];
730 //      ret->totalTokens = tokensPerProcessor;
731 //      ret->tokensLeft = tokensPerProcessor;
732         ret->nodeNo = node;
733 //      ret->postedRecvs = tokensPerProcessor;
734 #if     CMK_IBVERBS_DEBUG       
735         ret->psn = 0;
736         ret->recvPsn = 0;
737 #endif
738         
739         struct ibv_qp_attr attr = {
740                 .qp_state               = IBV_QPS_RTR,
741                 .path_mtu               = mtu,
742                 .dest_qp_num            = addr[1],
743                 .rq_psn                 = addr[2],
744                 .max_dest_rd_atomic     = 1,
745                 .min_rnr_timer          = 31,
746                 .ah_attr                = {
747                         .is_global      = 0,
748                         .dlid           = addr[0],
749                         .sl             = 0,
750                         .src_path_bits  = 0,
751                         .port_num       = context->ibPort
752                 }
753         };
754         
755         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
756         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
757         
758         if (err = ibv_modify_qp(ret->qp, &attr,
759           IBV_QP_STATE              |
760           IBV_QP_AV                 |
761           IBV_QP_PATH_MTU           |
762           IBV_QP_DEST_QPN           |
763           IBV_QP_RQ_PSN             |
764           IBV_QP_MAX_DEST_RD_ATOMIC |
765           IBV_QP_MIN_RNR_TIMER)) {
766                         MACHSTATE1(3,"ERROR %d",err);
767                         CmiAbort("failed to change qp state to RTR");
768         }
769
770         MACHSTATE(3,"qp state changed to RTR");
771         
772         attr.qp_state       = IBV_QPS_RTS;
773         attr.timeout        = 26;
774         attr.retry_cnt      = 20;
775         attr.rnr_retry      = 7;
776         attr.sq_psn         = context->localAddr[node].psn;
777         attr.max_rd_atomic  = 1;
778
779         
780         if (ibv_modify_qp(ret->qp, &attr,
781           IBV_QP_STATE              |
782           IBV_QP_TIMEOUT            |
783           IBV_QP_RETRY_CNT          |
784           IBV_QP_RNR_RETRY          |
785           IBV_QP_SQ_PSN             |
786           IBV_QP_MAX_QP_RD_ATOMIC)) {
787                         fprintf(stderr, "Failed to modify QP to RTS\n");
788                         exit(1);
789         }
790         MACHSTATE(3,"qp state changed to RTS");
791
792         MACHSTATE(3,"} initInfiOtherNodeData");
793         return ret;
794 }
795
796
797 void    infiPostInitialRecvs(){
798         //create the pool and post the receives
799         int numPosts;
800 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
801                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
802         }else{
803                 numPosts = maxRecvBuffers;
804         }*/
805
806         if(_Cmi_numnodes > 1){
807                 numPosts = maxRecvBuffers;
808         }else{
809                 numPosts = 0;
810         }
811         if(numPosts > 0){
812                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
813                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
814         }
815
816
817         free(context->qp);
818         context->qp = NULL;
819         free(context->localAddr);
820         context->localAddr= NULL;
821 }
822
823 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
824         int numBuffers;
825         int i;
826         int bigSize;
827         char *bigBuf;
828         struct infiBufferPool *ret;
829         struct ibv_mr *bigKey;
830
831         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
832
833         page_size = sysconf(_SC_PAGESIZE);
834         ret = malloc(sizeof(struct infiBufferPool));
835         ret->next = NULL;
836         numBuffers=ret->numBuffers = numRecvs;
837         
838         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
839         
840         bigSize = numBuffers*sizePerBuffer;
841         bigBuf=malloc(bigSize);
842         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
843         CmiAssert(bigKey != NULL);
844         
845         for(i=0;i<numBuffers;i++){
846                 struct infiBuffer *buffer =  &(ret->buffers[i]);
847                 buffer->type = BUFFER_RECV;
848                 buffer->size = sizePerBuffer;
849                 
850                 /*buffer->buf = malloc(sizePerBuffer);
851                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
852                 buffer->buf = &bigBuf[i*sizePerBuffer];
853                 buffer->key = bigKey;
854
855                 if(buffer->key == NULL){
856                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
857                         CmiAssert(buffer->key != NULL);
858                 }
859         }
860         return ret;
861 };
862
863
864
865 /**
866          Post the buffers as recv work requests
867 */
868 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
869         int j,err;
870         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
871         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
872         struct ibv_recv_wr *bad_wr;
873         
874         int startBufferIdx=0;
875         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
876         for(j=0;j<numRecvs;j++){
877                 
878                 
879                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
880                 sgElements[j].length = sizePerBuffer;
881                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
882                 
883                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
884                 workRequests[j].sg_list = &sgElements[j];
885                 workRequests[j].num_sge = 1;
886                 if(j != numRecvs-1){
887                         workRequests[j].next = &workRequests[j+1];
888                 }
889                 
890         }
891         workRequests[numRecvs-1].next = NULL;
892         MACHSTATE(3,"About to call ibv_post_srq_recv");
893         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
894                 CmiAssert(0);
895         }
896
897         free(workRequests);
898         free(sgElements);
899 }
900
901
902
903
904 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
905
906 static void CmiMachineExit()
907 {
908 #if CMK_IBVERBS_STATS   
909         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
910 #endif
911
912         //TODO: ERASE this
913         MACHSTATE2(3,"Free msgs %d not owned %d",TESTfrees,TESTneighbor);
914 }
915 static void ServiceCharmrun_nolock();
916
917 static void CmiNotifyStillIdle(CmiIdleState *s) {
918 #if CMK_SMP
919         CmiCommLock();
920 /*      if(where == COMM_SERVER_FROM_SMP)*/
921 #endif
922 /*              ServiceCharmrun_nolock();*/
923
924         CommunicationServer_nolock(0);
925 #if CMK_SMP
926         CmiCommUnlock();
927 #endif
928 }
929
930 static inline void increaseTokens(OtherNode node);
931
932 static inline int pollRecvCq(const int toBuffer);
933 static inline int pollSendCq(const int toBuffer);
934
935
936 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
937 #if !CMK_IBVERBS_TOKENS_FLOW
938         return;
939 #else
940         //if(infiData->tokensLeft == 0){
941         if(context->tokensLeft == 0){
942                 MACHSTATE(3,"GET FREE TOKENS {{{");
943         }else{
944                 return;
945         }
946         while(context->tokensLeft == 0){
947                 CommunicationServer_nolock(1); 
948         }
949         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
950 #endif
951 }
952
953
954 /**
955         Packetize this data and send it
956 **/
957
958
959
960 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
961         int incTokens=0;
962         int retval;
963 #if     CMK_IBVERBS_DEBUG
964         packet->header.psn = (++node->infiData->psn);
965 #endif  
966
967
968
969         packet->elemList[1].addr = (uintptr_t)packet->buf;
970         packet->elemList[1].length = size;
971         packet->elemList[1].lkey = dataKey->lkey;
972         
973         
974         packet->destNode = node;
975         
976 #if CMK_IBVERBS_STATS   
977         pktCount++;
978 #endif  
979         
980         getFreeTokens(node->infiData);
981
982 #if CMK_IBVERBS_INCTOKENS       
983         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
984                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
985                 incTokens=1;
986         }
987 #endif
988 /*
989         if(!checkQp(node->infiData->qp)){
990                 pollSendCq(1);
991                 CmiAssert(0);
992         }*/
993
994         struct ibv_send_wr *bad_wr=NULL;
995         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
996                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
997                 CmiAssert(0);
998         }
999 #if     CMK_IBVERBS_TOKENS_FLOW
1000         context->tokensLeft--;
1001 #if     CMK_IBVERBS_STATS
1002         if(context->tokensLeft < minTokensLeft){
1003                 minTokensLeft = context->tokensLeft;
1004         }
1005 #endif
1006 #endif
1007
1008 /*      if(!checkQp(node->infiData->qp)){
1009                 pollSendCq(1);
1010                 CmiAssert(0);
1011         }*/
1012
1013 #if CMK_IBVERBS_INCTOKENS       
1014         if(incTokens){
1015                 increaseTokens(node);
1016         }
1017 #endif
1018
1019
1020 #if     CMK_IBVERBS_DEBUG
1021         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1022 #else
1023         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1024 #endif
1025
1026 };
1027
1028
1029 static void inline EnqueueDummyPacket(OtherNode node,int size){
1030         infiPacket packet;
1031         MallocInfiPacket(packet);
1032         packet->size = size;
1033         packet->buf = CmiAlloc(size);
1034         
1035         packet->header.code = INFIDUMMYPACKET;
1036
1037         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1038         
1039         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1040         EnqueuePacket(node,packet,size,key);
1041 }
1042
1043
1044
1045
1046
1047
1048 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1049         infiPacket packet;
1050         MallocInfiPacket(packet);
1051         packet->size = size;
1052         packet->buf=data;
1053         
1054         //the nodeNo is added at time of packet allocation
1055         packet->header.code = INFIPACKETCODE_DATA;
1056         
1057         ogm->refcount++;
1058         packet->ogm = ogm;
1059         
1060         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1061         CmiAssert(key != NULL);
1062         
1063         EnqueuePacket(node,packet,size,key);
1064 };
1065
1066 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1067 static inline void processAllBufferedMsgs();
1068
1069 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1070         int size; char *data;
1071 //      processAllBufferedMsgs();
1072
1073         
1074         ogm->refcount++;
1075   size = ogm->size;
1076   data = ogm->data;
1077
1078 #if CMK_IBVERBS_STATS   
1079         msgCount++;
1080 #endif
1081
1082         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1083         //First packet has dgram header, other packets dont
1084         
1085   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1086         
1087         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1088
1089         if(rdma && size > rdmaThreshold){
1090                         EnqueueRdmaPacket(ogm,node);
1091         }else{
1092         
1093                 while(size > dataSize){
1094                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1095                         size -= dataSize;
1096                         data += dataSize;
1097                 }
1098                 if(size > 0){
1099                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1100                 }
1101         }
1102 //#if   !CMK_SMP
1103         processAllBufferedMsgs();
1104 //#endif
1105         ogm->refcount--;
1106         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1107 }
1108
1109
1110 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1111         infiPacket packet;
1112
1113         ogm->refcount++;
1114         
1115         MallocInfiPacket(packet);
1116  
1117  {
1118                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1119
1120                 
1121                 packet->size = sizeof(struct infiRdmaPacket);
1122                 packet->buf = (char *)rdmaPacket;
1123                 
1124 /*              struct ibv_mr *key = ibv_reg_mr(context->pd,ogm->data,ogm->size,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE);*/
1125                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1126                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1127                 
1128                 packet->header.code = INFIRDMA_START;
1129                 packet->header.nodeNo = _Cmi_mynode;
1130                 packet->ogm = NULL;
1131
1132                 rdmaPacket->type = INFI_MESG;
1133                 rdmaPacket->ogm = ogm;
1134                 rdmaPacket->key = *key;
1135                 rdmaPacket->keyPtr = key;
1136                 rdmaPacket->remoteBuf = ogm->data;
1137                 rdmaPacket->remoteSize = ogm->size;
1138                 
1139                 
1140                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1141                 
1142                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1143                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1144         }
1145 }
1146
1147
1148
1149 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1150 static inline void processSendWC(struct ibv_wc *sendWC);
1151 static unsigned int _count=0;
1152 extern int errno;
1153 static int _countAsync=0;
1154 static inline void processAsyncEvents(){
1155         struct ibv_async_event event;
1156         int ready;
1157         _countAsync++;
1158         if(_countAsync < 1){
1159                 return;
1160         }
1161         _countAsync=0;
1162         FD_SET(context->context->async_fd,&context->asyncFds);
1163         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1164         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1165         
1166         if(ready==0){
1167                 return;
1168         }
1169         if(ready == -1){
1170 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1171                 return;
1172         }
1173         
1174         if (ibv_get_async_event(context->context, &event)){
1175                 return;
1176                 CmiAbort("get async event failed");
1177         }
1178         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1179         ibv_ack_async_event(&event);
1180
1181         
1182 }
1183
1184 static void pollCmiDirectQ();
1185
1186 static inline  void CommunicationServer_nolock(int toBuffer) {
1187         int processed;
1188         if(_Cmi_numnodes <= 1){
1189                 pollCmiDirectQ();
1190                 return;
1191         }
1192         MACHSTATE(2,"CommServer_nolock{");
1193         
1194 //      processAsyncEvents();
1195         
1196 //      checkAllQps();
1197
1198         pollCmiDirectQ();
1199
1200         processed = pollRecvCq(toBuffer);
1201         
1202
1203         processed += pollSendCq(toBuffer);
1204         
1205         if(toBuffer == 0){
1206 //              if(processed != 0)
1207                         processAllBufferedMsgs();
1208         }
1209         
1210 //      checkAllQps();
1211 //      _count--;
1212         MACHSTATE(2,"} CommServer_nolock ne");
1213         
1214 }
1215 /*
1216 static inline infiBufferedWC createInfiBufferedWC(){
1217         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1218         ret->count = 0;
1219         ret->next = ret->prev =NULL;
1220         return ret;
1221 }*/
1222
1223 /****
1224         The buffered recvWC are stored in a doubly linked list of 
1225         arrays or blocks of wcs.
1226         To keep the average insert cost low, a new block is added 
1227         to the top of the list. (resulting in a reverse seq of blocks)
1228         Within a block however wc are stored in a sequence
1229 *****/
1230 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1231         infiBufferedWC block;
1232         MACHSTATE(3,"Insert Buffered Recv called");
1233         if( context->infiBufferedRecvList ==NULL){
1234                 context->infiBufferedRecvList = createInfiBufferedWC();
1235                 block = context->infiBufferedRecvList;
1236         }else{
1237                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1238                         block = createInfiBufferedWC();
1239                         context->infiBufferedRecvList->prev = block;
1240                         block->next = context->infiBufferedRecvList;
1241                         context->infiBufferedRecvList = block;
1242                 }else{
1243                         block = context->infiBufferedRecvList;
1244                 }
1245         }
1246         
1247         block->wcList[block->count] = *wc;
1248         block->count++;
1249 };
1250 */
1251
1252
1253 /********
1254 go through the blocks of bufferedWC. Process the last block first.
1255 Then the next one and so on. (Processing within a block should happen
1256 in sequence).
1257 Leave the last block in place to avoid having to allocate again
1258 ******/
1259 /*static inline void processBufferedRecvList(){
1260         infiBufferedWC start;
1261         start = context->infiBufferedRecvList;
1262         while(start->next != NULL){
1263                 start = start->next;
1264         }
1265         while(start != NULL){
1266                 int i=0;
1267                 infiBufferedWC tmp;
1268                 for(i=0;i<start->count;i++){
1269                         processRecvWC(&start->wcList[i]);
1270                 }
1271                 if(start != context->infiBufferedRecvList){
1272                         //not the first one
1273                         tmp = start;
1274                         start = start->prev;
1275                         free(tmp);
1276                         start->next = NULL;
1277                 }else{
1278                         start = start->prev;
1279                 }
1280         }
1281         context->infiBufferedRecvList->next = NULL;
1282         context->infiBufferedRecvList->prev = NULL;
1283         context->infiBufferedRecvList->count = 0;
1284 }
1285 */
1286
1287 static inline int pollRecvCq(const int toBuffer){
1288         int i;
1289         int ne;
1290         struct ibv_wc wc[WC_LIST_SIZE];
1291         
1292         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1293         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1294 //      CmiAssert(ne >=0);
1295         
1296         if(ne != 0){
1297                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1298         }
1299         
1300         for(i=0;i<ne;i++){
1301                 if(wc[i].status != IBV_WC_SUCCESS){
1302                         CmiAssert(0);
1303                 }
1304                 switch(wc[i].opcode){
1305                         case IBV_WC_RECV:
1306                                         processRecvWC(&wc[i],toBuffer);
1307                                 break;
1308                         default:
1309                                 CmiAbort("Wrong type of work completion object in recvq");
1310                                 break;
1311                 }
1312                         
1313         }
1314         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1315         return ne;
1316
1317 }
1318
1319 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1320
1321 static inline int pollSendCq(const int toBuffer){
1322         int i;
1323         int ne;
1324         struct ibv_wc wc[WC_LIST_SIZE];
1325
1326         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1327 //      CmiAssert(ne >=0);
1328         
1329         
1330         for(i=0;i<ne;i++){
1331                 if(wc[i].status != IBV_WC_SUCCESS){
1332                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1333 #if CMK_IBVERBS_STATS
1334         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1335 #endif
1336                         CmiAssert(0);
1337                 }
1338                 switch(wc[i].opcode){
1339                         case IBV_WC_SEND:{
1340                                 //message received
1341                                 processSendWC(&wc[i]);
1342                                 
1343                                 break;
1344                                 }
1345                         case IBV_WC_RDMA_READ:
1346                         {
1347 //                              processRdmaWC(&wc[i],toBuffer);
1348                                         processRdmaWC(&wc[i],1);
1349                                 break;
1350                         }
1351                         case IBV_WC_RDMA_WRITE:
1352                         {
1353                                 /*** used for CmiDirect puts 
1354                                 Nothing needs to be done on the sender side once send is done **/
1355                                 break;
1356                         }
1357                         default:
1358                                 CmiAbort("Wrong type of work completion object in recvq");
1359                                 break;
1360                 }
1361                         
1362         }
1363         return ne;
1364 }
1365
1366
1367 /******************
1368 Check the communication server socket and
1369
1370 *****************/
1371 int CheckSocketsReady(int withDelayMs)
1372 {   
1373   int nreadable;
1374   CMK_PIPE_DECL(withDelayMs);
1375
1376   CmiStdoutAdd(CMK_PIPE_SUB);
1377   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1378
1379   nreadable=CMK_PIPE_CALL();
1380   ctrlskt_ready_read = 0;
1381   dataskt_ready_read = 0;
1382   dataskt_ready_write = 0;
1383   
1384   if (nreadable == 0) {
1385     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1386     return nreadable;
1387   }
1388   if (nreadable==-1) {
1389     CMK_PIPE_CHECKERR();
1390     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1391     return CheckSocketsReady(0);
1392   }
1393   CmiStdoutCheck(CMK_PIPE_SUB);
1394   if (Cmi_charmrun_fd!=-1) 
1395           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1396   MACHSTATE(1,"} CheckSocketsReady")
1397   return nreadable;
1398 }
1399
1400
1401 /*** Service the charmrun socket
1402 *************/
1403
1404 static void ServiceCharmrun_nolock()
1405 {
1406   int again = 1;
1407   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1408   while (again)
1409   {
1410   again = 0;
1411   CheckSocketsReady(0);
1412   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1413   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1414   }
1415   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1416 }
1417
1418
1419
1420 static void CommunicationServer(int sleepTime, int where){
1421         if( where == COMM_SERVER_FROM_INTERRUPT){
1422                 return;
1423         }
1424 #if CMK_SMP
1425         if(where == COMM_SERVER_FROM_WORKER){
1426                 return;
1427         }
1428         if(where == COMM_SERVER_FROM_SMP){
1429 #endif
1430                 ServiceCharmrun_nolock();
1431 #if CMK_SMP
1432         }
1433         CmiCommLock();
1434 #endif
1435         CommunicationServer_nolock(0);
1436 #if CMK_SMP
1437         CmiCommUnlock();
1438 #endif
1439 }
1440
1441
1442 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1443
1444
1445 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1446
1447 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1448         char *newmsg;
1449         
1450         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1451         
1452         OtherNode node = &nodes[nodeNo];
1453         newmsg = node->asm_msg;         
1454         
1455         /// This simple state machine determines if this packet marks the beginning of a new message
1456         // from another node, or if this is another in a sequence of packets
1457         switch(node->infiData->state){
1458                 case INFI_HEADER_DATA:
1459                 {
1460                         int size;
1461                         int rank, srcpe, seqno, magic, i;
1462                         unsigned int broot;
1463                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1464                         size = CmiMsgHeaderGetLength(msg);
1465                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1466 //                      CmiAssert(size > 0);
1467 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1468                         
1469 //                      CmiAssert(newmsg == NULL);
1470                         if(len > size){
1471                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1472                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1473                         }
1474                         newmsg = (char *)CmiAlloc(size);
1475       _MEMCHECK(newmsg);
1476       memcpy(newmsg, msg, len);
1477       node->asm_rank = rank;
1478       node->asm_total = size;
1479       node->asm_fill = len;
1480       node->asm_msg = newmsg;
1481                         node->infiData->broot = broot;
1482                         if(len == size){
1483                                 //this is the only packet for this message 
1484                                 node->infiData->state = INFI_HEADER_DATA;
1485                         }else{
1486                                 //there are more packets following
1487                                 node->infiData->state = INFI_DATA;
1488                         }
1489                         break;
1490                 }
1491                 case INFI_DATA:
1492                 {
1493                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1494                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1495                                 CmiAbort("packet in the middle does not have expected length");
1496                         }
1497                         if(node->asm_fill+len > node->asm_total){
1498                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1499                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1500                         }
1501                         //tODO: remove this
1502                         memcpy(newmsg + node->asm_fill,msg,len);
1503                         node->asm_fill += len;
1504                         if(node->asm_fill == node->asm_total){
1505                                 node->infiData->state = INFI_HEADER_DATA;
1506                         }else{
1507                                 node->infiData->state = INFI_DATA;
1508                         }
1509                         break;
1510                 }
1511         }
1512         /// if this packet was the last packet in a message ie state was 
1513         /// reset to infi_header_data
1514         
1515         if(node->infiData->state == INFI_HEADER_DATA){
1516                 int total_size = node->asm_total;
1517                 node->asm_msg = NULL;
1518                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1519                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1520         }
1521         
1522 };
1523
1524 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1525 #if CMK_BROADCAST_SPANNING_TREE
1526         if (rank == DGRAM_BROADCAST
1527 #if CMK_NODE_QUEUE_AVAILABLE
1528           || rank == DGRAM_NODEBROADCAST
1529 #endif
1530          ){
1531                                          if(toBuffer){
1532                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1533                                                 }else{
1534                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1535                                                 }
1536                                         }
1537 #elif CMK_BROADCAST_HYPERCUBE
1538         if (rank == DGRAM_BROADCAST
1539 #if CMK_NODE_QUEUE_AVAILABLE
1540           || rank == DGRAM_NODEBROADCAST
1541 #endif
1542          ){
1543                                          if(toBuffer){
1544                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1545                                          }else{
1546                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1547                                                 }
1548                                         }
1549 #endif
1550
1551
1552                 
1553                 switch (rank) {
1554         case DGRAM_BROADCAST: {
1555                                 int i;
1556                                 for (i=1; i<_Cmi_mynodesize; i++){
1557                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1558                                 }
1559           CmiPushPE(0, newmsg);
1560           break;
1561       }
1562 #if CMK_NODE_QUEUE_AVAILABLE
1563         case DGRAM_NODEBROADCAST: 
1564         case DGRAM_NODEMESSAGE: {
1565           CmiPushNode(newmsg);
1566           break;
1567         }
1568 #endif
1569         default:
1570                                 {
1571                                         
1572           CmiPushPE(rank, newmsg);
1573                                 }
1574         }    /* end of switch */
1575                 if(!toBuffer){
1576 //#if !CMK_SMP          
1577                 processAllBufferedMsgs();
1578 //#endif
1579                 }
1580 }
1581
1582
1583 static inline void increasePostedRecvs(int nodeNo);
1584 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1585 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1586
1587 //struct infiDirectRequestPacket;
1588 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1589
1590 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1591         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1592         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1593         int nodeNo = header->nodeNo;
1594 #if     CMK_IBVERBS_DEBUG
1595         OtherNode node = &nodes[nodeNo];
1596 #endif
1597         
1598         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1599 #if     CMK_IBVERBS_DEBUG
1600         if(node->infiData->recvPsn == 0){
1601                 node->infiData->recvPsn = header->psn;
1602         }else{
1603                 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1604                 node->infiData->recvPsn++;
1605         }
1606         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1607 #else
1608         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1609 #endif
1610         
1611         if(header->code & INFIPACKETCODE_DATA){
1612                         
1613                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1614         }
1615         if(header->code & INFIDUMMYPACKET){
1616                 MACHSTATE(3,"Dummy packet");
1617         }
1618         if(header->code & INFIBARRIERPACKET){
1619                 MACHSTATE(3,"Barrier packet");
1620                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1621         }
1622
1623 #if CMK_IBVERBS_INCTOKENS       
1624         if(header->code & INFIPACKETCODE_INCTOKENS){
1625                 increasePostedRecvs(nodeNo);
1626         }
1627 #endif  
1628         if(rdma && header->code & INFIRDMA_START){
1629                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1630 //              if(toBuffer){
1631                         //TODO: make a function of this and use for both acks and requests
1632                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1633                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1634                         *copyPacket = *rdmaPacket;
1635                         copyPacket->fromNodeNo = nodeNo;
1636                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1637                         context->bufferedRdmaRequests = copyPacket;
1638                         copyPacket->next = tmp;
1639                         copyPacket->prev = NULL;
1640                         if(tmp != NULL){
1641                                 tmp->prev = copyPacket;
1642                         }
1643 /*              }else{
1644                         processRdmaRequest(rdmaPacket,nodeNo,0);
1645                 }*/
1646         }
1647         if(rdma && header->code & INFIRDMA_ACK){
1648                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1649                 processRdmaAck(rdmaPacket);
1650         }
1651 /*      if(header->code & INFIDIRECT_REQUEST){
1652                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1653                 processDirectRequest(directRequestPacket);
1654         }*/
1655         {
1656                 struct ibv_sge list = {
1657                         .addr   = (uintptr_t) buffer->buf,
1658                         .length = buffer->size,
1659                         .lkey   = buffer->key->lkey
1660                 };
1661         
1662                 struct ibv_recv_wr wr = {
1663                         .wr_id = (uint64_t)buffer,
1664                         .sg_list = &list,
1665                         .num_sge = 1,
1666                         .next = NULL
1667                 };
1668                 struct ibv_recv_wr *bad_wr;
1669         
1670                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1671                         CmiAssert(0);
1672                 }
1673         }
1674
1675 };
1676
1677
1678
1679
1680 static inline  void processSendWC(struct ibv_wc *sendWC){
1681
1682         infiPacket packet = (infiPacket )sendWC->wr_id;
1683 #if CMK_IBVERBS_TOKENS_FLOW
1684 //      packet->destNode->infiData->tokensLeft++;
1685         context->tokensLeft++;
1686 #endif
1687
1688         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1689         if(packet->ogm != NULL){
1690                 packet->ogm->refcount--;
1691                 if(packet->ogm->refcount == 0){
1692                         GarbageCollectMsg(packet->ogm); 
1693                 }
1694         }else{
1695                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1696
1697                 }
1698         }
1699
1700         FreeInfiPacket(packet);
1701 };
1702
1703
1704
1705 /********************************************************************/
1706 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1707         int nodeNo = fromNodeNo;
1708         OtherNode node = &nodes[nodeNo];
1709         struct infiRdmaPacket *rdmaPacket;
1710
1711         getFreeTokens(node->infiData);
1712 #if CMK_IBVERBS_TOKENS_FLOW
1713 //      node->infiData->tokensLeft--;
1714         context->tokensLeft--;
1715 #if     CMK_IBVERBS_STATS
1716         if(context->tokensLeft < minTokensLeft){
1717                 minTokensLeft = context->tokensLeft;
1718         }
1719 #endif
1720 #endif
1721         
1722         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1723 //      CmiAssert(buffer != NULL);
1724         
1725         
1726         if(isBuffered){
1727                 rdmaPacket = _rdmaPacket;
1728         }else{
1729                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1730                 *rdmaPacket = *_rdmaPacket;
1731         }
1732
1733
1734         rdmaPacket->fromNodeNo = fromNodeNo;
1735         rdmaPacket->localBuffer = (void *)buffer;
1736         
1737         buffer->type = BUFFER_RDMA;
1738         buffer->size = rdmaPacket->remoteSize;
1739         
1740         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1741 //      CmiAssert(buffer->buf != NULL);
1742
1743         buffer->key = METADATAFIELD(buffer->buf)->key;
1744
1745         
1746         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1747         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1748 //      CmiAssert(buffer->key != NULL);
1749         
1750         {
1751                 struct ibv_sge list = {
1752                         .addr = (uintptr_t )buffer->buf,
1753                         .length = buffer->size,
1754                         .lkey   = buffer->key->lkey
1755                 };
1756
1757                 struct ibv_send_wr *bad_wr;
1758                 struct ibv_send_wr wr = {
1759                         .wr_id = (uint64_t )rdmaPacket,
1760                         .sg_list = &list,
1761                         .num_sge = 1,
1762                         .opcode = IBV_WR_RDMA_READ,
1763                         .send_flags = IBV_SEND_SIGNALED,
1764                         .wr.rdma = {
1765                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1766                                 .rkey = rdmaPacket->key.rkey
1767                         }
1768                 };
1769                 /** post and rdma_read that is a rdma get*/
1770                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1771                         CmiAssert(0);
1772                 }
1773         }
1774
1775 };
1776
1777 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1778 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1779
1780 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1781                 //rdma get done
1782 #if CMK_IBVERBS_STATS
1783         double _startRegTime;
1784 #endif  
1785
1786         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1787 /*      if(rdmaPacket->type == INFI_DIRECT){
1788                 processDirectWC(rdmaPacket);
1789                 return;
1790         }*/
1791 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1792         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1793
1794         /*TODO: remove this
1795         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1796         
1797 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1798         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1799         
1800         {
1801                 int size;
1802                 int rank, srcpe, seqno, magic, i;
1803                 unsigned int broot;
1804                 char *msg = buffer->buf;
1805                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1806                 size = CmiMsgHeaderGetLength(msg);
1807 /*              CmiAssert(size == buffer->size);*/
1808                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1809         }
1810         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1811
1812         
1813         free(buffer);
1814         
1815         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1816         //we are sending this ack as a response to a successful
1817         // rdma_Read.. the token for that rdma_Read needs to be freed
1818 #if CMK_IBVERBS_TOKENS_FLOW     
1819         //node->infiData->tokensLeft++;
1820         context->tokensLeft++;
1821 #endif
1822
1823         //send ack to sender if toBuffer is off otherwise buffer it
1824         if(toBuffer){
1825                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1826                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1827                 context->bufferedRdmaAcks = rdmaPacket;
1828                 rdmaPacket->next = tmp;
1829                 rdmaPacket->prev = NULL;
1830                 if(tmp != NULL){
1831                         tmp->prev = rdmaPacket; 
1832                 }
1833         }else{
1834                 EnqueueRdmaAck(rdmaPacket);             
1835                 free(rdmaPacket);
1836         }
1837 }
1838
1839 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1840         infiPacket packet;
1841         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1842
1843         
1844         MallocInfiPacket(packet);
1845         {
1846                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1847                 *ackPacket = *rdmaPacket;
1848                 packet->size = sizeof(struct infiRdmaPacket);
1849                 packet->buf = (char *)ackPacket;
1850                 packet->header.code = INFIRDMA_ACK;
1851                 packet->ogm=NULL;
1852                 
1853                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1854         
1855
1856                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1857         }
1858 };
1859
1860
1861 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1862         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1863         rdmaPacket->ogm->refcount--;
1864         GarbageCollectMsg(rdmaPacket->ogm);
1865 }
1866
1867
1868 /****************************
1869  Deal with all the buffered (delayed) messages
1870  such as processing recvd broadcasts, sending
1871  rdma acks and processing recvd rdma requests
1872 ******************************/
1873
1874
1875 static inline infiBufferedBcastPool createBcastPool(){
1876         int i;
1877         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1878         ret->count = 0;
1879         ret->next = ret->prev = NULL;   
1880         for(i=0;i<BCASTLIST_SIZE;i++){
1881                 ret->bcastList[i].valid = 0;
1882         }
1883         return ret;
1884 };
1885 /****
1886         The buffered bcast messages are stored in a doubly linked list of 
1887         arrays or blocks.
1888         To keep the average insert cost low, a new block is added 
1889         to the top of the list. (resulting in a reverse seq of blocks)
1890         Within a block however bcast are stored in increasing order sequence
1891 *****/
1892
1893 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1894         if(context->bufferedBcastList == NULL){
1895                 context->bufferedBcastList = createBcastPool();
1896         }else{
1897                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1898                         infiBufferedBcastPool tmp;
1899                         tmp = createBcastPool();
1900                         context->bufferedBcastList->prev = tmp;
1901                         tmp->next = context->bufferedBcastList;
1902                         context->bufferedBcastList = tmp;
1903                 }
1904         }
1905         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1906         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1907         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1908         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1909         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1910         
1911         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1912         
1913         context->bufferedBcastList->count++;
1914 }
1915
1916 /*********
1917         Go through the blocks of buffered bcast messages. process last block first
1918         processign within a block is in sequence though
1919 *********/
1920 static inline void processBufferedBcast(){
1921         infiBufferedBcastPool start;
1922
1923         if(context->bufferedBcastList == NULL){
1924                 return;
1925         }
1926         start = context->bufferedBcastList;
1927         if(context->insideProcessBufferedBcasts==1){
1928                 return;
1929         }
1930         context->insideProcessBufferedBcasts=1;
1931
1932         while(start->next != NULL){
1933                 start = start->next;
1934         }
1935         
1936         while(start != NULL){
1937                 int i=0;
1938                 infiBufferedBcastPool tmp;
1939                 if(start->count != 0){
1940                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1941                 }
1942                 for(i=0;i<start->count;i++){
1943                         if(start->bcastList[i].valid == 0){
1944                                 continue;
1945                         }
1946                         start->bcastList[i].valid=0;
1947                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1948 #if CMK_BROADCAST_SPANNING_TREE
1949         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1950 #if CMK_NODE_QUEUE_AVAILABLE
1951           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1952 #endif
1953          ){
1954                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
1955                                         }
1956 #elif CMK_BROADCAST_HYPERCUBE
1957         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1958 #if CMK_NODE_QUEUE_AVAILABLE
1959           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1960 #endif
1961          ){
1962                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
1963                                         }
1964 #endif
1965                 }
1966                 if(start->count != 0){
1967                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1968                 }
1969                 
1970                 tmp = start;
1971                 start = start->prev;
1972                 free(tmp);
1973                 if(start != NULL){
1974                         //not the first one
1975                         start->next = NULL;
1976                 }
1977         }
1978
1979         context->bufferedBcastList = NULL;
1980 /*      context->bufferedBcastList->prev = NULL;
1981         context->bufferedBcastList->count =0;   */
1982         context->insideProcessBufferedBcasts=0;
1983         MACHSTATE(2,"processBufferedBcast done ");
1984 };
1985
1986
1987 static inline void processBufferedRdmaAcks(){
1988         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
1989         if(start == NULL){
1990                 return;
1991         }
1992         while(start->next != NULL){
1993                 start = start->next;
1994         }
1995         while(start != NULL){
1996                 struct infiRdmaPacket *rdmaPacket=start;
1997                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
1998                 EnqueueRdmaAck(rdmaPacket);
1999                 start = start->prev;
2000                 free(rdmaPacket);
2001         }
2002         context->bufferedRdmaAcks=NULL;
2003 }
2004
2005
2006
2007 static inline void processBufferedRdmaRequests(){
2008         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2009         if(start == NULL){
2010                 return;
2011         }
2012         
2013         
2014         while(start->next != NULL){
2015                 start = start->next;
2016         }
2017         while(start != NULL){
2018                 struct infiRdmaPacket *rdmaPacket=start;
2019                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2020                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2021                 start = start->prev;
2022         }
2023         
2024         context->bufferedRdmaRequests=NULL;
2025 }
2026
2027
2028
2029
2030
2031 static inline void processAllBufferedMsgs(){
2032 #if CMK_IBVERBS_STATS
2033         double _startTime = CmiWallTimer();
2034         processBufferedCount++;
2035 #endif
2036         processBufferedBcast();
2037
2038         processBufferedRdmaAcks();
2039         processBufferedRdmaRequests();
2040 #if CMK_IBVERBS_STATS
2041         processBufferedTime += (CmiWallTimer()-_startTime);
2042 #endif  
2043 };
2044
2045
2046 /*************************
2047         Increase tokens when short of them
2048 **********/
2049 static inline void increaseTokens(OtherNode node){
2050         int err;
2051         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2052         if(node->infiData->totalTokens + increase > maxTokens){
2053                 increase = maxTokens-node->infiData->totalTokens;
2054         }
2055         node->infiData->totalTokens += increase;
2056         node->infiData->tokensLeft += increase;
2057         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2058         //increase the size of the sendCq
2059         int currentCqSize = context->sendCqSize;
2060         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2061                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2062                 CmiAssert(0);
2063         }
2064         context->sendCqSize+= increase;
2065 };
2066
2067 static void increasePostedRecvs(int nodeNo){
2068         OtherNode node = &nodes[nodeNo];
2069         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2070         int recvIncrease = tokenIncrease;
2071         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2072                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2073         }
2074         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2075                 recvIncrease = maxRecvBuffers-context->srqSize;
2076         }
2077         node->infiData->postedRecvs+= recvIncrease;
2078         context->srqSize += recvIncrease;
2079         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2080         //increase the size of the recvCq
2081         int currentCqSize = context->recvCqSize;
2082         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2083                 CmiAssert(0);
2084         }
2085         context->recvCqSize += tokenIncrease;
2086         if(recvIncrease > 0){
2087                 //create another bufferPool and attach it to the top of the current one
2088                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2089                 newPool->next = context->recvBufferPool;
2090                 context->recvBufferPool = newPool;
2091                 postInitialRecvs(newPool,recvIncrease,packetSize);
2092         }
2093
2094 };
2095
2096
2097
2098
2099 /*********************************************
2100         Memory management routines for RDMA
2101
2102 ************************************************/
2103
2104 /**
2105         There are INFINUMPOOLS of memory.
2106         The first pool is of size firstBinSize.
2107         The ith pool is of size firstBinSize*2^i
2108 */
2109
2110 static void initInfiCmiChunkPools(){
2111         int i,j;
2112         int size = firstBinSize;
2113         int nodeSize;
2114
2115 #if THREAD_MULTI_POOL
2116         nodeSize = CmiMyNodeSize() + 1;
2117         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2118         for(i = 0; i < nodeSize; i++){
2119                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2120         }
2121         for(j = 0; j < nodeSize; j++){
2122                 size = firstBinSize;
2123                 for(i=0;i<INFINUMPOOLS;i++){
2124                         infiCmiChunkPools[j][i].size = size;
2125                         infiCmiChunkPools[j][i].startBuf = NULL;
2126                         infiCmiChunkPools[j][i].count = 0;
2127                         size *= 2;
2128                 }
2129         }
2130
2131         // creating the n^2 system of queues
2132         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2133         for(i = 0; i < nodeSize; i++){
2134                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2135         }
2136         for(i = 0; i < nodeSize; i++)
2137                 for(j = 0; j < nodeSize; j++)
2138                         queuePool[i][j] = PCQueueCreate();
2139
2140 #else
2141
2142         size = firstBinSize;    
2143         for(i=0;i<INFINUMPOOLS;i++){
2144                 infiCmiChunkPools[i].size = size;
2145                 infiCmiChunkPools[i].startBuf = NULL;
2146                 infiCmiChunkPools[i].count = 0;
2147                 size *= 2;
2148         }
2149 #endif
2150
2151 }
2152
2153 /***********
2154 Register memory for a part of a received multisend message
2155 *************/
2156 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2157         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2158         char *res=msg-sizeof(infiCmiChunkHeader);
2159         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2160         metaData->owner = NULL;
2161         metaData->poolIdx = INFIMULTIPOOL;
2162
2163         return metaData;
2164 };
2165
2166
2167 #if THREAD_MULTI_POOL
2168 static inline void *getInfiCmiChunkThread(int dataSize){
2169         //find out to which pool this dataSize belongs to
2170         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2171         int ratio = dataSize/firstBinSize;
2172         int poolIdx=0;
2173         void *res;
2174
2175         //printf("Hi\n");
2176         MACHSTATE1(2,"Rank=%d",CmiMyRank());
2177         
2178         while(ratio > 0){
2179                 ratio  = ratio >> 1;
2180                 poolIdx++;
2181         }
2182         MACHSTATE1(2,"This is %d",CmiMyRank());
2183         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2184         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2185                 infiCmiChunkMetaData *metaData;         
2186                 infiCmiChunkHeader *hdr;
2187                 int allocSize;
2188                 int count=1;
2189                 int i;
2190                 struct ibv_mr *key;
2191                 void *origres;
2192                 
2193                 
2194                 if(poolIdx < INFINUMPOOLS ){
2195                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2196                 }else{
2197                         allocSize = dataSize;
2198                 }
2199
2200                 if(poolIdx < blockThreshold){
2201                         count = blockAllocRatio;
2202                 }
2203                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2204                 hdr = res;
2205                 
2206                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2207                 CmiAssert(key != NULL);
2208                 
2209                 origres = (res += sizeof(infiCmiChunkHeader));
2210
2211                 for(i=0;i<count;i++){
2212                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2213                         metaData->key = key;
2214                         metaData->owner = hdr;
2215                         metaData->poolIdx = poolIdx;
2216                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2217
2218                         if(i == 0){
2219                                 metaData->owner->metaData->count = count;
2220                                 metaData->nextBuf = NULL;
2221                         }else{
2222                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2223                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2224                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2225                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2226                                 
2227                         }
2228                         if(i != count-1){
2229                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2230                         }
2231           }     
2232                 
2233                 
2234                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2235                 
2236                 return origres;
2237         }
2238         if(poolIdx < INFINUMPOOLS){
2239                 infiCmiChunkMetaData *metaData;                         
2240         
2241                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2242                 res += sizeof(infiCmiChunkHeader);
2243
2244                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2245                 metaData = METADATAFIELD(res);
2246
2247                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2248                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2249                 
2250                 metaData->nextBuf = NULL;
2251 //              CmiAssert(metaData->poolIdx == poolIdx);
2252
2253                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2254                 return res;
2255         }
2256
2257         CmiAssert(0);
2258
2259         
2260 };
2261 #else
2262 static inline void *getInfiCmiChunk(int dataSize){
2263         //find out to which pool this dataSize belongs to
2264         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2265         int ratio = dataSize/firstBinSize;
2266         int poolIdx=0;
2267         void *res;
2268
2269         while(ratio > 0){
2270                 ratio  = ratio >> 1;
2271                 poolIdx++;
2272         }
2273         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2274         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2275                 infiCmiChunkMetaData *metaData;
2276                 infiCmiChunkHeader *hdr;
2277                 int allocSize;
2278                 int count=1;
2279                 int i;
2280                 struct ibv_mr *key;
2281                 void *origres;
2282
2283
2284                 if(poolIdx < INFINUMPOOLS ){
2285                         allocSize = infiCmiChunkPools[poolIdx].size;
2286                 }else{
2287                         allocSize = dataSize;
2288                 }
2289
2290                 if(poolIdx < blockThreshold){
2291                         count = blockAllocRatio;
2292                 }
2293                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2294                 hdr = res;
2295
2296                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2297                 CmiAssert(key != NULL);
2298
2299                 origres = (res += sizeof(infiCmiChunkHeader));
2300
2301                 for(i=0;i<count;i++){
2302                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2303                         metaData->key = key;
2304                         metaData->owner = hdr;
2305                         metaData->poolIdx = poolIdx;
2306
2307                         if(i == 0){
2308                                 metaData->owner->metaData->count = count;
2309                                 metaData->nextBuf = NULL;
2310                         }else{
2311                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2312                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2313                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2314                                 infiCmiChunkPools[poolIdx].count++;
2315
2316                         }
2317                         if(i != count-1){
2318                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2319                         }
2320           }
2321
2322
2323                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2324
2325                 return origres;
2326         }
2327         if(poolIdx < INFINUMPOOLS){
2328                 infiCmiChunkMetaData *metaData;
2329
2330                 res = infiCmiChunkPools[poolIdx].startBuf;
2331                 res += sizeof(infiCmiChunkHeader);
2332
2333                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2334                 metaData = METADATAFIELD(res);
2335
2336                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2337                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2338
2339                 metaData->nextBuf = NULL;
2340 //              CmiAssert(metaData->poolIdx == poolIdx);
2341
2342                 infiCmiChunkPools[poolIdx].count--;
2343                 return res;
2344         }
2345
2346         CmiAssert(0);
2347
2348
2349 };
2350 #endif
2351
2352
2353 void * infi_CmiAlloc(int size){
2354         void *res;
2355
2356
2357 #if THREAD_MULTI_POOL
2358         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2359         res -= sizeof(CmiChunkHeader);
2360         return res;
2361 #else
2362 #if CMK_SMP     
2363         CmiMemLock();
2364 #endif
2365 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2366                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2367
2368                 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));     
2369                 res -= sizeof(CmiChunkHeader);
2370 #if CMK_SMP     
2371         CmiMemUnlock();
2372 #endif
2373 /*      }else{
2374                 res = malloc(size);
2375         }*/
2376         return res;
2377 #endif
2378 }
2379
2380 #if THREAD_MULTI_POOL
2381 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2382 void infi_CmiFreeDirect(void *ptr){
2383         int size;
2384         int parentPe;
2385         void *freePtr = ptr;
2386
2387         //ptr += sizeof(CmiChunkHeader);
2388         size = SIZEFIELD (ptr);
2389 /*      if(size > firstBinSize){*/
2390         infiCmiChunkMetaData *metaData;
2391         int poolIdx;
2392         //there is a infiniband specific header
2393         freePtr = ptr - sizeof(infiCmiChunkHeader);
2394         metaData = METADATAFIELD(ptr);
2395         poolIdx = metaData->poolIdx;
2396
2397         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2398 //      CmiAssert(poolIdx >= 0);
2399         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].count <= INFIMAXPERPOOL){
2400                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2401                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = freePtr;
2402                         infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2403
2404                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf,infiCmiChunkPools[CmiMyRank()][poolIdx].count);
2405                 }else{
2406                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2407                         metaData->owner->metaData->count--;
2408                         if(metaData->owner->metaData == metaData){
2409                                 //I am the owner
2410                                 if(metaData->owner->metaData->count == 0){
2411                                         //all the chunks have been freed
2412                                         ibv_dereg_mr(metaData->key);
2413                                         free(freePtr);
2414                                         free(metaData);
2415                                 }
2416                                 //if I am the owner and all the chunks have not been
2417                                 // freed dont free my metaData. will need later
2418                         }else{
2419                                 if(metaData->owner->metaData->count == 0){
2420                                         //need to free the owner's buffer and metadata
2421                                         freePtr = metaData->owner;
2422                                         ibv_dereg_mr(metaData->key);
2423                                         free(metaData->owner->metaData);
2424                                         free(freePtr);
2425                                 }
2426                                 free(metaData);
2427                         }
2428                 }
2429 }
2430
2431
2432 void infi_CmiFree(void *ptr){
2433         int i,j;
2434         int size;
2435         int parentPe;
2436         int nodeSize;
2437         void *pointer;
2438         void *freePtr = ptr;
2439         nodeSize = CmiMyNodeSize() + 1;
2440
2441         MACHSTATE(3,"Freeing");
2442
2443         ptr += sizeof(CmiChunkHeader);
2444         size = SIZEFIELD (ptr);
2445 /*      if(size > firstBinSize){*/
2446         infiCmiChunkMetaData *metaData;
2447         int poolIdx;
2448         //there is a infiniband specific header
2449         freePtr = ptr - sizeof(infiCmiChunkHeader);
2450         metaData = METADATAFIELD(ptr);
2451         poolIdx = metaData->poolIdx;
2452
2453         if(poolIdx == INFIMULTIPOOL){
2454                 /** this is a part of a received mult message  
2455                     it will be freed correctly later
2456                 **/
2457
2458                 return;
2459         }
2460
2461
2462         //TODO: erase this
2463         TESTfrees++;
2464
2465         // checking if this free operation is my responsibility
2466         parentPe = metaData->parentPe;
2467         if(parentPe != CmiMyRank()){
2468                 TESTneighbor++;
2469                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2470                 return;
2471         }
2472
2473
2474         infi_CmiFreeDirect(ptr);
2475
2476         // checking free request queues
2477         for(i = 0; i < nodeSize; i++){
2478                 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2479                         for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2480                                 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2481                                 infi_CmiFreeDirect(pointer);    
2482                         }
2483                 }
2484         }
2485 }
2486
2487 #else
2488 void infi_CmiFree(void *ptr){
2489         int size;
2490         void *freePtr = ptr;
2491         
2492 #if CMK_SMP     
2493         CmiMemLock();
2494 #endif
2495         ptr += sizeof(CmiChunkHeader);
2496         size = SIZEFIELD (ptr);
2497 /*      if(size > firstBinSize){*/
2498                 infiCmiChunkMetaData *metaData;
2499                 int poolIdx;
2500                 //there is a infiniband specific header
2501                 freePtr = ptr - sizeof(infiCmiChunkHeader);
2502                 metaData = METADATAFIELD(ptr);
2503                 poolIdx = metaData->poolIdx;
2504                 if(poolIdx == INFIMULTIPOOL){
2505                         /** this is a part of a received mult message  
2506                         it will be freed correctly later
2507                         **/
2508                         
2509                         return;
2510                 }
2511                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2512 //              CmiAssert(poolIdx >= 0);
2513                 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
2514                         metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2515                         infiCmiChunkPools[poolIdx].startBuf = freePtr;
2516                         infiCmiChunkPools[poolIdx].count++;
2517                         
2518                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2519                 }else{
2520                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2521                         metaData->owner->metaData->count--;
2522                         if(metaData->owner->metaData == metaData){
2523                                 //I am the owner
2524                                 if(metaData->owner->metaData->count == 0){
2525                                         //all the chunks have been freed
2526                                         ibv_dereg_mr(metaData->key);
2527                                         free(freePtr);
2528                                         free(metaData);
2529                                 }
2530                                 //if I am the owner and all the chunks have not been
2531                                 // freed dont free my metaData. will need later
2532                         }else{
2533                                 if(metaData->owner->metaData->count == 0){
2534                                         //need to free the owner's buffer and metadata
2535                                         freePtr = metaData->owner;
2536                                         ibv_dereg_mr(metaData->key);
2537                                         free(metaData->owner->metaData);
2538                                         free(freePtr);
2539                                 }
2540                                 free(metaData);
2541                         }
2542                 }       
2543 #if CMK_SMP     
2544         CmiMemUnlock();
2545 #endif
2546 /*      }else{
2547                 free(freePtr);
2548         }*/
2549 }
2550 #endif
2551
2552 /*********************************************************************************************
2553 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2554 the user can transfer data between processors without using Charm++ messages. This lets the user
2555 send and receive data from the middle of his arrays without any copying on either send or receive
2556 side
2557 *********************************************************************************************/
2558
2559 struct infiDirectRequestPacket{
2560         int senderProc;
2561         int handle;
2562         struct ibv_mr senderKey;
2563         void *senderBuf;
2564         int senderBufSize;
2565 };
2566
2567 #include "cmidirect.h"
2568
2569 #define MAXHANDLES 512
2570
2571 struct infiDirectHandleStruct;
2572
2573
2574 typedef struct directPollingQNodeStruct {
2575         struct infiDirectHandleStruct *handle;
2576         struct directPollingQNodeStruct *next;
2577         double *lastDouble;
2578 } directPollingQNode;
2579
2580 typedef struct infiDirectHandleStruct{
2581         int id;
2582         void *buf;
2583         int size;
2584         struct ibv_mr *key;
2585         void (*callbackFnPtr)(void *);
2586         void *callbackData;
2587 //      struct infiDirectRequestPacket *packet;
2588         struct infiDirectUserHandle userHandle;
2589         struct infiRdmaPacket *rdmaPacket;
2590         directPollingQNode pollingQNode;
2591 }       infiDirectHandle;
2592
2593 typedef struct infiDirectHandleTableStruct{
2594         infiDirectHandle handles[MAXHANDLES];
2595         struct infiDirectHandleTableStruct *next;
2596 } infiDirectHandleTable;
2597
2598
2599 // data structures 
2600
2601 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2602
2603 static infiDirectHandleTable **sendHandleTable=NULL;
2604 static infiDirectHandleTable **recvHandleTable=NULL;
2605
2606 static int *recvHandleCount=NULL;
2607
2608 void addHandleToPollingQ(infiDirectHandle *handle){
2609 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2610         directPollingQNode *newNode = &(handle->pollingQNode);
2611         newNode->handle = handle;
2612         newNode->next = NULL;
2613         if(headDirectPollingQ==NULL){
2614                 /*empty pollingQ*/
2615                 headDirectPollingQ = newNode;
2616                 tailDirectPollingQ = newNode;
2617         }else{
2618                 tailDirectPollingQ->next = newNode;
2619                 tailDirectPollingQ = newNode;
2620         }
2621 };
2622 /*
2623 infiDirectHandle *removeHandleFromPollingQ(){
2624         if(headDirectPollingQ == NULL){
2625                 //polling Q is empty
2626                 return NULL;
2627         }
2628         directPollingQNode *retNode = headDirectPollingQ;
2629         if(headDirectPollingQ == tailDirectPollingQ){
2630                 //PollingQ has one node 
2631                 headDirectPollingQ = tailDirectPollingQ = NULL;
2632         }else{
2633                 headDirectPollingQ = headDirectPollingQ->next;
2634         }
2635         infiDirectHandle *retHandle = retNode->handle;
2636         free(retNode);
2637         return retHandle;
2638 }*/
2639
2640 static inline infiDirectHandleTable **createHandleTable(){
2641         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2642         int i;
2643         for(i=0;i<_Cmi_numnodes;i++){
2644                 table[i] = NULL;
2645         }
2646         return table;
2647 }
2648
2649 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2650         *tableIdx = handle/MAXHANDLES;
2651         *idx = handle%MAXHANDLES;
2652 };
2653
2654 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2655 {
2656         /** initialize the last double in the buffer to bufize***/
2657         int index = recvBufSize - sizeof(double);
2658         double *lastDouble = (double *)(((char *)recvBuf)+index);
2659         *lastDouble = initialValue;
2660 }
2661
2662
2663 /**
2664  To be called on the receiver to create a handle and return its number
2665 **/
2666 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2667         int newHandle;
2668         int tableIdx,idx;
2669         int i;
2670         infiDirectHandleTable *table;
2671         struct infiDirectUserHandle userHandle;
2672         
2673         CmiAssert(recvBufSize > sizeof(double));
2674         
2675         if(recvHandleTable == NULL){
2676                 recvHandleTable = createHandleTable();
2677                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2678                 for(i=0;i<_Cmi_numnodes;i++){
2679                         recvHandleCount[i] = -1;
2680                 }
2681         }
2682         if(recvHandleTable[senderNode] == NULL){
2683                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2684                 recvHandleTable[senderNode]->next = NULL;               
2685         }
2686         
2687         newHandle = ++recvHandleCount[senderNode];
2688         CmiAssert(newHandle >= 0);
2689         
2690         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2691         
2692         table = recvHandleTable[senderNode];
2693         for(i=0;i<tableIdx;i++){
2694                 if(table->next ==NULL){
2695                         table->next = malloc(sizeof(infiDirectHandleTable));
2696                         table->next->next = NULL;
2697                 }
2698                 table = table->next;
2699         }
2700         table->handles[idx].id = newHandle;
2701         table->handles[idx].buf = recvBuf;
2702         table->handles[idx].size = recvBufSize;
2703 #if CMI_DIRECT_DEBUG
2704         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2705 #endif
2706         table->handles[idx].callbackFnPtr = callbackFnPtr;
2707         table->handles[idx].callbackData = callbackData;
2708         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2709         CmiAssert(table->handles[idx].key != NULL);
2710
2711 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2712         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2713         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2714         
2715         userHandle.handle = newHandle;
2716         userHandle.recverNode = _Cmi_mynode;
2717         userHandle.senderNode = senderNode;
2718         userHandle.recverBuf = recvBuf;
2719         userHandle.recverBufSize = recvBufSize;
2720         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2721         userHandle.initialValue = initialValue;
2722         
2723         table->handles[idx].userHandle = userHandle;
2724         
2725         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2726
2727   {
2728          int index = table->handles[idx].size - sizeof(double);
2729    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2730         } 
2731         
2732         addHandleToPollingQ(&(table->handles[idx]));
2733         
2734 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2735         
2736         return userHandle;
2737 }
2738
2739 /****
2740  To be called on the sender to attach the sender's buffer to this handle
2741 ******/
2742 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2743         int tableIdx,idx;
2744         int i;
2745         int handle = userHandle->handle;
2746         int recverNode  = userHandle->recverNode;
2747
2748         infiDirectHandleTable *table;
2749
2750         if(sendHandleTable == NULL){
2751                 sendHandleTable = createHandleTable();
2752         }
2753         if(sendHandleTable[recverNode] == NULL){
2754                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2755                 sendHandleTable[recverNode]->next = NULL;
2756         }
2757         
2758         CmiAssert(handle >= 0);
2759         calcHandleTableIdx(handle,&tableIdx,&idx);
2760         
2761         table = sendHandleTable[recverNode];
2762         for(i=0;i<tableIdx;i++){
2763                 if(table->next ==NULL){
2764                         table->next = malloc(sizeof(infiDirectHandleTable));
2765                         table->next->next = NULL;
2766                 }
2767                 table = table->next;
2768         }
2769
2770         table->handles[idx].id = handle;
2771         table->handles[idx].buf = sendBuf;
2772
2773         table->handles[idx].size = sendBufSize;
2774 #if CMI_DIRECT_DEBUG
2775         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2776 #endif
2777         table->handles[idx].callbackFnPtr = table->handles[idx].callbackData = NULL;
2778         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2779         CmiAssert(table->handles[idx].key != NULL);
2780         table->handles[idx].userHandle = *userHandle;
2781         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2782         
2783         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2784         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2785         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2786         
2787         
2788 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2789         table->handles[idx].packet->senderProc = _Cmi_mynode;
2790         table->handles[idx].packet->handle = handle;
2791         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2792         table->handles[idx].packet->senderBuf = sendBuf;
2793         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2794         
2795         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2796 };
2797
2798
2799
2800
2801
2802 /****
2803 To be called on the sender to do the actual data transfer
2804 ******/
2805 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2806         int handle = userHandle->handle;
2807         int recverNode = userHandle->recverNode;
2808         if(recverNode == _Cmi_mynode){
2809                 /*when the sender and receiver are on the same
2810                 processor, just look up the sender and receiver
2811                 buffers and do a memcpy*/
2812
2813                 infiDirectHandleTable *senderTable;
2814                 infiDirectHandleTable *recverTable;
2815                 
2816                 int tableIdx,idx,i;
2817
2818                 
2819                 /*find entry for this handle in sender table*/
2820                 calcHandleTableIdx(handle,&tableIdx,&idx);
2821                 CmiAssert(sendHandleTable!= NULL);
2822                 senderTable = sendHandleTable[_Cmi_mynode];
2823                 CmiAssert(senderTable != NULL);
2824                 for(i=0;i<tableIdx;i++){
2825                         senderTable = senderTable->next;
2826                 }
2827
2828                 /**find entry for this handle in recver table*/
2829                 recverTable = recvHandleTable[recverNode];
2830                 CmiAssert(recverTable != NULL);
2831                 for(i=0;i< tableIdx;i++){
2832                         recverTable = recverTable->next;
2833                 }
2834                 
2835                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
2836                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
2837 #if CMI_DIRECT_DEBUG
2838                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
2839 #endif
2840                 // The polling Q should find you and handle the callback and pollingq entry
2841                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
2842                 
2843
2844         }else{
2845                 infiPacket packet;
2846                 int tableIdx,idx;
2847                 int i;
2848                 OtherNode node;
2849                 infiDirectHandleTable *table;
2850
2851                 calcHandleTableIdx(handle,&tableIdx,&idx);
2852
2853                 table = sendHandleTable[recverNode];
2854                 CmiAssert(table != NULL);
2855                 for(i=0;i<tableIdx;i++){
2856                         table = table->next;
2857                 }
2858
2859 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
2860 #if CMI_DIRECT_DEBUG
2861                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
2862 #endif
2863
2864                 
2865                 {
2866                         
2867                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
2868                         struct ibv_sge list = {
2869                                 .addr = (uintptr_t )table->handles[idx].buf,
2870                                 .length = table->handles[idx].size,
2871                                 .lkey   = table->handles[idx].key->lkey
2872                         };
2873                         
2874                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
2875
2876                         struct ibv_send_wr *bad_wr;
2877                         struct ibv_send_wr wr = {
2878                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
2879                                 .sg_list = &list,
2880                                 .num_sge = 1,
2881                                 .opcode = IBV_WR_RDMA_WRITE,
2882                                 .send_flags = IBV_SEND_SIGNALED,
2883                                 
2884                                 .wr.rdma = {
2885                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
2886                                         .rkey = remoteKey->rkey
2887                                 }
2888                         };
2889                         /** post and rdma_read that is a rdma get*/
2890                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
2891                                 CmiAssert(0);
2892                         }
2893                 }
2894
2895         /*      MallocInfiPacket (packet);
2896                 {
2897                         packet->size = sizeof(struct infiDirectRequestPacket);
2898                         packet->buf = (char *)(table->handles[idx].packet);
2899                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
2900                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
2901                 }*/
2902         }
2903
2904 };
2905
2906 /**** need not be called the first time *********/
2907 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
2908   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
2909 }
2910
2911 /**** need not be called the first time *********/
2912 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
2913         int handle = userHandle->handle;
2914         int tableIdx,idx,i;
2915         infiDirectHandleTable *table;
2916         calcHandleTableIdx(handle,&tableIdx,&idx);
2917         
2918         table = recvHandleTable[userHandle->senderNode];
2919         CmiAssert(table != NULL);
2920         for(i=0;i<tableIdx;i++){
2921                 table = table->next;
2922         }
2923 #if CMI_DIRECT_DEBUG
2924   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
2925 #endif  
2926         addHandleToPollingQ(&(table->handles[idx]));
2927         
2928
2929 }
2930
2931 /**** need not be called the first time *********/
2932 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
2933         int handle = userHandle->handle;
2934         int tableIdx,idx,i;
2935         infiDirectHandleTable *table;
2936         
2937         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
2938
2939         calcHandleTableIdx(handle,&tableIdx,&idx);
2940         
2941         table = recvHandleTable[userHandle->senderNode];
2942         CmiAssert(table != NULL);
2943         for(i=0;i<tableIdx;i++){
2944                 table = table->next;
2945         }
2946 #if CMI_DIRECT_DEBUG
2947   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
2948 #endif  
2949         addHandleToPollingQ(&(table->handles[idx]));
2950         
2951 }
2952
2953
2954 static int receivedDirectMessage(infiDirectHandle *handle){
2955 //      int index = handle->size - sizeof(double);
2956 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
2957         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
2958                 return 0;
2959         }else{
2960                 (*(handle->callbackFnPtr))(handle->callbackData);       
2961                 return 1;
2962         }
2963         
2964 }
2965
2966
2967 static void pollCmiDirectQ(){
2968         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
2969         while(ptr != NULL){
2970                 if(receivedDirectMessage(ptr->handle)){
2971 #if CMI_DIRECT_DEBUG
2972       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
2973 #endif
2974                         directPollingQNode *delPtr = ptr;
2975                         /** has been received and delete this node***/
2976                         if(prevPtr == NULL){
2977                                 /** first in the pollingQ**/
2978                                 if(headDirectPollingQ == tailDirectPollingQ){
2979                                         /**only node in pollingQ****/
2980                                         headDirectPollingQ = tailDirectPollingQ = NULL;
2981                                 }else{
2982                                         headDirectPollingQ = headDirectPollingQ->next;
2983                                 }
2984                         }else{
2985                                 if(ptr == tailDirectPollingQ){
2986                                         /**last node is being deleted**/
2987                                         tailDirectPollingQ = prevPtr;
2988                                 }
2989                                 prevPtr->next = ptr->next;
2990                         }
2991                         ptr = ptr->next;
2992                 //      free(delPtr);
2993                 }else{
2994                         prevPtr = ptr;
2995                         ptr = ptr->next;
2996                 }
2997         }
2998 }
2999
3000
3001 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3002         int senderProc = directRequestPacket->senderProc;
3003         int handle = directRequestPacket->handle;
3004         int tableIdx,idx,i;
3005         infiDirectHandleTable *table;
3006         OtherNode node = nodes_by_pe[senderProc];
3007
3008         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3009
3010         calcHandleTableIdx(handle,&tableIdx,&idx);
3011
3012         table = recvHandleTable[senderProc];
3013         CmiAssert(table != NULL);
3014         for(i=0;i<tableIdx;i++){
3015                 table = table->next;
3016         }
3017         
3018         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3019         
3020         {
3021                 struct ibv_sge list = {
3022                         .addr = (uintptr_t )table->handles[idx].buf,
3023                         .length = table->handles[idx].size,
3024                         .lkey   = table->handles[idx].key->lkey
3025                 };
3026
3027                 struct ibv_send_wr *bad_wr;
3028                 struct ibv_send_wr wr = {
3029                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3030                         .sg_list = &list,
3031                         .num_sge = 1,
3032                         .opcode = IBV_WR_RDMA_READ,
3033                         .send_flags = IBV_SEND_SIGNALED,
3034                         .wr.rdma = {
3035                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3036                                 .rkey = directRequestPacket->senderKey.rkey
3037                         }
3038                 };
3039 //       post and rdma_read that is a rdma get
3040                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3041                         CmiAssert(0);
3042                 }
3043         }
3044                         
3045         
3046 };*/
3047 /*
3048 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3049         MACHSTATE(3,"processDirectWC");
3050         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3051         (*(handle->callbackFnPtr))(handle->callbackData);
3052 };
3053 */
3054
3055 static void sendBarrierMessage(int pe)
3056 {
3057   /* we will only need one packet */
3058   int size=32;
3059   OtherNode  node = nodes + pe;
3060   infiPacket packet;
3061   MallocInfiPacket(packet);
3062   packet->size = size;
3063   packet->buf = CmiAlloc(size);
3064   packet->header.code = INFIBARRIERPACKET;
3065   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3066   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3067   /*  pollSendCq(0);*/
3068   EnqueuePacket(node,packet,size,key);
3069 }
3070
3071 static void recvBarrierMessage()
3072 {
3073   int i;
3074   int ne;
3075   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3076   struct ibv_wc wc[1];
3077   struct ibv_wc *recvWC;
3078   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3079   int toBuffer=1; // buffer without processing recvd messages
3080   int barrierReached=0;
3081   struct infiBuffer *buffer = NULL;
3082   struct infiPacketHeader *header = NULL;
3083   int nodeNo=-1;
3084   int len=-1;
3085   while(!barrierReached)
3086     {
3087       /* gengbin's semantic will implode if more than one q is polled at a time */
3088       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3089       //        CmiAssert(ne >=0);
3090       if(ne != 0){
3091         MACHSTATE1(3,"recvBarrier ne %d",ne);
3092       }
3093       pollSendCq(1); 
3094       for(i=0;i<ne;i++){
3095         if(wc[i].status != IBV_WC_SUCCESS){
3096           CmiAssert(0);
3097         }
3098         switch(wc[i].opcode){
3099         case IBV_WC_RECV: /* we have something to consider*/
3100           recvWC=&wc[i];
3101           buffer = (struct infiBuffer *) recvWC->wr_id; 
3102           header = (struct infiPacketHeader *)buffer->buf;
3103           nodeNo = header->nodeNo;
3104           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3105
3106           if(header->code & INFIPACKETCODE_DATA){
3107             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3108           }
3109           if(header->code & INFIDUMMYPACKET){
3110             MACHSTATE(3,"Dummy packet");
3111           }
3112           if(header->code & INFIBARRIERPACKET){
3113             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3114             // now we are done
3115             barrierReached=1;
3116             /* semantically questionable */
3117             //processAllBufferedMsgs();
3118             //return;
3119           }
3120           if(rdma && header->code & INFIRDMA_START){
3121             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3122             //          if(toBuffer){
3123             //TODO: make a function of this and use for both acks and requests
3124             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3125             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3126             *copyPacket = *rdmaPacket;
3127             copyPacket->fromNodeNo = nodeNo;
3128             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3129             context->bufferedRdmaRequests = copyPacket;
3130             copyPacket->next = tmp;
3131             copyPacket->prev = NULL;
3132             if(tmp != NULL){
3133               tmp->prev = copyPacket;
3134             }
3135             /*          }else{
3136                         processRdmaRequest(rdmaPacket,nodeNo,0);
3137                         }*/
3138           }
3139           if(rdma && header->code & INFIRDMA_ACK){
3140             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3141             processRdmaAck(rdmaPacket);
3142           }
3143           {
3144             struct ibv_sge list = {
3145               .addr     = (uintptr_t) buffer->buf,
3146               .length = buffer->size,
3147               .lkey     = buffer->key->lkey
3148             };
3149         
3150             struct ibv_recv_wr wr = {
3151               .wr_id = (uint64_t)buffer,
3152               .sg_list = &list,
3153               .num_sge = 1,
3154               .next = NULL
3155             };
3156             struct ibv_recv_wr *bad_wr;
3157         
3158             if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3159               CmiAssert(0);
3160             }
3161           }
3162
3163           break;
3164         default:
3165           CmiAbort("Wrong type of work completion object in recvq");
3166           break;
3167         }
3168       }
3169     }
3170   /* semantically questionable */
3171   //  processAllBufferedMsgs();
3172 }
3173
3174
3175 /* happen at node level */
3176 int CmiBarrier()
3177 {
3178   int len, size, i;
3179   int status;
3180   int count = 0;
3181   OtherNode node;
3182   int numnodes = CmiNumNodes();
3183   if (CmiMyRank() == 0) {
3184     /* every one send to pe 0 */
3185     if (CmiMyNode() != 0) {
3186       sendBarrierMessage(0);
3187     }
3188     /* printf("[%d] HERE\n", CmiMyPe()); */
3189     if (CmiMyNode() == 0) 
3190     {
3191       for (count = 1; count < numnodes; count ++) 
3192       {
3193         recvBarrierMessage();
3194       }
3195       /* pe 0 broadcast */
3196       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3197         int p = i;
3198         if (p > numnodes - 1) break;
3199         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3200         sendBarrierMessage(p);
3201       }
3202     }
3203     /* non 0 node waiting */
3204     if (CmiMyNode() != 0) 
3205     {
3206       recvBarrierMessage();
3207       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3208         int p = CmiMyNode();
3209         p = BROADCAST_SPANNING_FACTOR*p + i;
3210         if (p > numnodes - 1) break;
3211         p = p%numnodes;
3212         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3213         sendBarrierMessage(p);
3214       }
3215     }
3216   }
3217   CmiNodeAllBarrier();
3218   processAllBufferedMsgs();
3219   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3220 }
3221
3222 /* everyone sends a message to pe 0 and go on */
3223 int CmiBarrierZero()
3224 {
3225   int i;
3226
3227   if (CmiMyRank() == 0) {
3228     if (CmiMyNode()) {
3229       sendBarrierMessage(0);
3230     }
3231     else {
3232       for (i=0; i<CmiNumNodes()-1; i++)
3233       {
3234         recvBarrierMessage();
3235       }
3236     }
3237   }
3238   CmiNodeAllBarrier();
3239   processAllBufferedMsgs();
3240 }
3241