7d9a228a77d66e96b13d44be98428450f9f0d496
[charm.git] / src / arch / net / machine-ibverbs.c
1 /** @file
2                         size = CmiMsgHeaderGetLength(msg);
3  * Ibverbs (infiniband)  implementation of Converse NET version
4  * @ingroup NET
5  * contains only Ibverbs specific code for:
6  * - CmiMachineInit()
7  * - CmiCommunicationInit()
8  * - CmiNotifyStillIdle()
9  * - DeliverViaNetwork()
10  * - CommunicationServer()
11  * - CmiMachineExit()
12
13   created by 
14         Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
15 */
16
17 /**
18  * @addtogroup NET
19  * @{
20  */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <malloc.h>
28 #include <getopt.h>
29 #include <time.h>
30
31 #include <infiniband/verbs.h>
32
33 enum ibv_mtu mtu = IBV_MTU_2048;
34 static int page_size;
35 static int mtu_size;
36 static int packetSize;
37 static int dataSize;
38 static int rdma;
39 static int rdmaThreshold;
40 static int firstBinSize;
41 static int blockAllocRatio;
42 static int blockThreshold;
43
44
45 static int maxRecvBuffers;
46 static int maxTokens;
47 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
48 static int sendPacketPoolSize; /*total number of send buffers created*/
49
50 static double _startTime=0;
51 static int regCount;
52
53 static int pktCount;
54 static int msgCount;
55 static int minTokensLeft;
56
57
58 static double regTime;
59
60 static double processBufferedTime;
61 static int processBufferedCount;
62
63 #define CMK_IBVERBS_STATS 0
64 #define CMK_IBVERBS_TOKENS_FLOW 1
65 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on 
66 #define CMK_IBVERBS_DEBUG 0
67 #define CMI_DIRECT_DEBUG 0
68 #define WC_LIST_SIZE 32
69 /*#define WC_BUFFER_SIZE 100*/
70
71
72
73
74 #define INCTOKENS_FRACTION 0.04
75 #define INCTOKENS_INCREASE .50
76
77 // flag for using a pool for every thread
78 #define THREAD_MULTI_POOL 0
79
80 //TODO: erase this constant value if ++ppn capture is possible
81 #define INFI_NODE_SIZE 8
82
83 #if THREAD_MULTI_POOL 
84 #include "pcqueue.h"
85 PCQueue **queuePool;
86 #endif
87
88 #define INFIBARRIERPACKET 128
89
90 struct infiIncTokenAckPacket{
91         int a;
92 };
93
94
95
96 typedef struct {
97 char none;  
98 } CmiIdleState;
99
100
101 /********
102 ** The notify idle methods
103 ***/
104
105 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
106
107 static void CmiNotifyStillIdle(CmiIdleState *s);
108
109
110 static void CmiNotifyBeginIdle(CmiIdleState *s)
111 {
112   CmiNotifyStillIdle(s);
113 }
114
115 void CmiNotifyIdle(void) {
116   CmiNotifyStillIdle(NULL);
117 }
118
119 /***************
120 Data Structures 
121 ***********************/
122
123 /******
124         This is a header attached to the beginning of every infiniband packet
125 *******/
126 #define INFIPACKETCODE_DATA 1
127 #define INFIPACKETCODE_INCTOKENS 2
128 #define INFIRDMA_START 4
129 #define INFIRDMA_ACK 8
130 #define INFIDIRECT_REQUEST 16
131 #define INFIPACKETCODE_INCTOKENSACK 32
132 #define INFIDUMMYPACKET 64
133
134 struct infiPacketHeader{
135         char code;
136         int nodeNo;
137 #if     CMK_IBVERBS_DEBUG
138         int psn;
139 #endif  
140 };
141
142 /*
143         Types of rdma packets
144 */
145 #define INFI_MESG 1 
146 #define INFI_DIRECT 2
147
148 struct infiRdmaPacket{
149         int fromNodeNo;
150         int type;
151         struct ibv_mr key;
152         struct ibv_mr *keyPtr;
153         int remoteSize;
154         char *remoteBuf;
155         void *localBuffer;
156         OutgoingMsg ogm;
157         struct infiRdmaPacket *next,*prev;
158 };
159
160
161 /** Represents a buffer that is used to receive messages
162 */
163 #define BUFFER_RECV 1
164 #define BUFFER_RDMA 2
165 struct infiBuffer{
166         int type;
167         char *buf;
168         int size;
169         struct ibv_mr *key;
170 };
171
172
173
174 /** At the moment it is a simple pool with just a list of buffers
175         * TODO; extend it to make it an element in a linklist of pools
176 */
177 struct infiBufferPool{
178         int numBuffers;
179         struct infiBuffer *buffers;
180         struct infiBufferPool *next;
181 };
182
183 /*****
184         It is the structure for the send buffers that are used
185         to send messages to other nodes
186 ********/
187
188
189 typedef struct infiPacketStruct {       
190         char *buf;
191         int size;
192         struct infiPacketHeader header;
193         struct ibv_mr *keyHeader;
194         struct OtherNodeStruct *destNode;
195         struct infiPacketStruct *next;
196         OutgoingMsg ogm;
197         struct ibv_sge elemList[2];
198         struct ibv_send_wr wr;
199 }* infiPacket;
200
201 /*
202 typedef struct infiBufferedWCStruct{
203         struct ibv_wc wcList[WC_BUFFER_SIZE];
204         int count;
205         struct infiBufferedWCStruct *next,*prev;
206 } * infiBufferedWC;
207 */
208
209 #define BCASTLIST_SIZE 50
210
211 struct infiBufferedBcastStruct{
212         char *msg;
213         int size;
214         int broot;
215         int asm_rank;
216         int valid;
217 };
218
219 typedef struct infiBufferedBcastPoolStruct{
220         struct infiBufferedBcastPoolStruct *next,*prev;
221         struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
222         int count;
223
224 } *infiBufferedBcastPool;
225
226
227
228
229 /***
230         This structure represents the data needed by the infiniband
231         communication routines of a node
232         TODO: add locking for the smp version
233 */
234 struct infiContext {
235         struct ibv_context      *context;
236         
237         fd_set  asyncFds;
238         struct timeval tmo;
239         
240         int ibPort;
241 //      struct ibv_comp_channel *channel;
242         struct ibv_pd           *pd;
243         struct ibv_cq           *sendCq;
244         struct ibv_cq   *recvCq;
245         struct ibv_srq  *srq;
246         
247         struct ibv_qp           **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
248                                                                                                 //It is used between CmiMachineInit and the call to node_addresses_store
249                                                                                                 //when the qps are stored in the corresponding OtherNodes
250
251         struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
252
253         infiPacket infiPacketFreeList; 
254         
255         struct infiBufferPool *recvBufferPool;
256
257         struct infiPacketHeader header;
258
259         int srqSize;
260         int sendCqSize,recvCqSize;
261         int tokensLeft;
262
263         infiBufferedBcastPool bufferedBcastList;
264         
265         struct infiRdmaPacket *bufferedRdmaAcks;
266         
267         struct infiRdmaPacket *bufferedRdmaRequests;
268 /*      infiBufferedWC infiBufferedRecvList;*/
269         
270         int insideProcessBufferedBcasts;
271 };
272
273 static struct infiContext *context;
274
275
276
277
278 /** Represents a qp used to send messages to another node
279  There is one for each remote node
280 */
281 struct infiAddr {
282         int lid,qpn,psn;
283 };
284
285 /**
286  Stored in the OtherNode structure in machine-dgram.c 
287  Store the per node data for ibverbs layer
288 */
289 enum { INFI_HEADER_DATA=21,INFI_DATA};
290
291 struct infiOtherNodeData{
292         struct ibv_qp *qp ;
293         int state;// does it expect a packet with a header (first packet) or one without
294         int totalTokens;
295         int tokensLeft;
296         int nodeNo;
297         
298         int postedRecvs;
299         int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
300 #if     CMK_IBVERBS_DEBUG       
301         int psn;
302         int recvPsn;
303 #endif  
304 };
305
306
307 /********************************
308 Memory management structures and types
309 *****************/
310
311 struct infiCmiChunkHeaderStruct;
312
313 typedef struct infiCmiChunkMetaDataStruct {
314         struct ibv_mr *key;
315         int poolIdx;
316         void *nextBuf;
317         struct infiCmiChunkHeaderStruct *owner;
318         int count;
319
320 #if THREAD_MULTI_POOL
321         int parentPe;                                           // the PE that allocated the buffer and must release it
322 #endif
323 } infiCmiChunkMetaData;
324
325
326
327
328 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
329
330 typedef struct {
331         int size;//without infiCmiChunkHeader
332         void *startBuf;
333         int count;
334 } infiCmiChunkPool;
335
336 #define INFINUMPOOLS 20
337 #define INFIMAXPERPOOL 100
338 #define INFIMULTIPOOL -5
339
340 #if THREAD_MULTI_POOL
341 infiCmiChunkPool **infiCmiChunkPools;
342 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
343 #else
344 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
345 #endif
346
347 static void initInfiCmiChunkPools();
348
349
350 static inline infiPacket newPacket(){
351         infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
352         pkt->size = -1;
353         pkt->header = context->header;
354         pkt->next = NULL;
355         pkt->destNode = NULL;
356         pkt->keyHeader = METADATAFIELD(pkt)->key;
357         pkt->ogm=NULL;
358         CmiAssert(pkt->keyHeader!=NULL);
359         
360         pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
361         pkt->elemList[0].length = sizeof(struct infiPacketHeader);
362         pkt->elemList[0].lkey = pkt->keyHeader->lkey;
363         
364         pkt->wr.wr_id = (uint64_t)pkt;
365         pkt->wr.sg_list = &(pkt->elemList[0]);
366         pkt->wr.num_sge = 2;
367         pkt->wr.opcode = IBV_WR_SEND;
368         pkt->wr.send_flags = IBV_SEND_SIGNALED;
369         pkt->wr.next = NULL;
370         
371         return pkt;
372 };
373
374 #define FreeInfiPacket(pkt){ \
375         pkt->size = -1;\
376         pkt->ogm=NULL;\
377         pkt->next = context->infiPacketFreeList; \
378         context->infiPacketFreeList = pkt; \
379 }
380
381 #define MallocInfiPacket(pkt) { \
382         infiPacket p = context->infiPacketFreeList; \
383         if(p == NULL){ p = newPacket();} \
384                  else{context->infiPacketFreeList = p->next; } \
385         pkt = p;\
386 }
387
388
389
390
391 /******************CmiMachineInit and its helper functions*/
392 static inline int pollSendCq(const int toBuffer);
393
394 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
395 static uint16_t getLocalLid(struct ibv_context *context, int port);
396 static int  checkQp(struct ibv_qp *qp){
397         struct ibv_qp_attr attr;
398         struct ibv_qp_init_attr init_attr;
399                  
400         ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP  ,&init_attr);
401         if(attr.cur_qp_state != IBV_QPS_RTS){
402                 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
403                 return 0;
404         }
405         return 1;
406 }
407 static void checkAllQps(){
408         int i;
409         for(i=0;i<_Cmi_numnodes;i++){
410                 if(i != _Cmi_mynode){
411                         if(!checkQp(nodes[i].infiData->qp)){
412                                 pollSendCq(0);
413                                 CmiAssert(0);
414                         }
415                 }
416         }
417 }
418
419 #if CMK_IBVERBS_FAST_START
420 static void send_partial_init();
421 #endif
422
423 static void CmiMachineInit(char **argv){
424         struct ibv_device **devList;
425         struct ibv_device *dev;
426         int ibPort;
427         int i;
428         int calcMaxSize;
429         infiPacket *pktPtrs;
430         struct infiRdmaPacket **rdmaPktPtrs;
431
432         MACHSTATE(3,"CmiMachineInit {");
433         MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
434         
435         //TODO: make the device and ibport configureable by commandline parameter
436         //Check example for how to do that
437         devList =  ibv_get_device_list(NULL);
438         CmiAssert(devList != NULL);
439
440         dev = *devList;
441         CmiAssert(dev != NULL);
442
443         ibPort=1;
444
445         MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
446
447         context = (struct infiContext *)malloc(sizeof(struct infiContext));
448         
449         MACHSTATE1(3,"context allocated %p",context);
450         
451         //localAddr will store the local addresses of all the qps
452         context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*_Cmi_numnodes);
453         
454         MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
455         
456         context->ibPort = ibPort;
457         //the context for this infiniband device 
458         context->context = ibv_open_device(dev);
459         CmiAssert(context->context != NULL);
460         
461         MACHSTATE1(3,"device opened %p",context->context);
462
463 /*      FD_ZERO(&context->asyncFds);
464         FD_SET(context->context->async_fd,&context->asyncFds);
465         context->tmo.tv_sec=0;
466         context->tmo.tv_usec=0;
467         
468         MACHSTATE(3,"asyncFds zeroed and set");*/
469
470         //protection domain
471         context->pd = ibv_alloc_pd(context->context);
472         CmiAssert(context->pd != NULL);
473         MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
474
475   /******** At this point we know that this node is more or less serviceable
476         So, this is a good point for sending the partial init message for the fast
477         start case
478         Moreover, no work dependent on the number of nodes has started yet.
479         ************/
480
481 #if CMK_IBVERBS_FAST_START
482   send_partial_init();
483 #endif
484
485
486         context->header.nodeNo = _Cmi_mynode;
487
488         mtu_size=1200;
489         packetSize = mtu_size*4;
490         dataSize = packetSize-sizeof(struct infiPacketHeader);
491         
492         calcMaxSize=8000;
493 /*      if(_Cmi_numnodes*50 > calcMaxSize){
494                 calcMaxSize = _Cmi_numnodes*50;
495                 if(calcMaxSize > 10000){
496                         calcMaxSize = 10000;
497                 }
498         }*/
499 //      maxRecvBuffers=80;
500         maxRecvBuffers=calcMaxSize;
501         maxTokens = maxRecvBuffers;
502 //      maxTokens = 80;
503         context->tokensLeft=maxTokens;
504         //tokensPerProcessor=4;
505         if(_Cmi_numnodes > 1){
506                 createLocalQps(dev,ibPort,_Cmi_mynode,_Cmi_numnodes,context->localAddr);
507         }
508         
509         
510         //TURN ON RDMA
511         rdma=1;
512 //      rdmaThreshold=32768;
513         rdmaThreshold=22000;
514         firstBinSize = 120;
515         CmiAssert(rdmaThreshold > firstBinSize);
516         blockAllocRatio=16;
517         blockThreshold=8;
518
519         
520         
521         initInfiCmiChunkPools();
522                 
523
524         /*create the pool of send packets*/
525         sendPacketPoolSize = maxTokens/2;       
526         if(sendPacketPoolSize > 2000){
527                 sendPacketPoolSize = 2000;
528         }
529         
530         context->infiPacketFreeList=NULL;
531         pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
532
533         //Silly way of allocating the memory buffers (slow as well) but simplifies the code
534         for(i=0;i<sendPacketPoolSize;i++){
535                 MallocInfiPacket(pktPtrs[i]);   
536         }
537
538         for(i=0;i<sendPacketPoolSize;i++){
539                 FreeInfiPacket(pktPtrs[i]);     
540         }
541         free(pktPtrs);
542         
543         context->bufferedBcastList=NULL;
544         context->bufferedRdmaAcks = NULL;
545         context->bufferedRdmaRequests = NULL;
546         context->insideProcessBufferedBcasts=0;
547         
548         
549         if(rdma){
550 /*              int numPkts;
551                 int k;
552                 if( _Cmi_numnodes*4 < maxRecvBuffers/4){
553                         numPkts = _Cmi_numnodes*4;
554                 }else{
555                         numPkts = maxRecvBuffers/4;
556                 }
557                 
558                 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
559                 for(k=0;k<numPkts;k++){
560                         rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
561                 }
562                 
563                 for(k=0;k<numPkts;k++){
564                         CmiFree(rdmaPktPtrs[k]);
565                 }
566                 free(rdmaPktPtrs);*/
567         }
568         
569 /*      context->infiBufferedRecvList = NULL;*/
570 #if CMK_IBVERBS_STATS   
571         regCount =0;
572         regTime  = 0;
573
574         pktCount=0;
575         msgCount=0;
576
577         processBufferedCount=0;
578         processBufferedTime=0;
579
580         minTokensLeft = maxTokens;
581 #endif  
582
583         
584
585         MACHSTATE(3,"} CmiMachineInit");
586 }
587
588 void CmiCommunicationInit(char **argv)
589 {
590 }
591
592 /*********
593         Open a qp for every processor
594 *****/
595 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
596         int myLid;
597         int i;
598         
599         
600         //find my lid
601         myLid = getLocalLid(context->context,ibPort);
602         
603         MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
604
605         context->sendCqSize = maxTokens+2;
606         context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
607         CmiAssert(context->sendCq != NULL);
608         
609         MACHSTATE1(3,"sendCq created %p",context->sendCq);
610         
611         
612         context->recvCqSize = maxRecvBuffers;
613         context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
614         
615         MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
616         CmiAssert(context->recvCq != NULL);
617         
618         //array of queue pairs
619
620         context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
621
622         if(numNodes > 1)
623         {
624                 context->srqSize = (maxRecvBuffers+2);
625                 struct ibv_srq_init_attr srqAttr = {
626                         .attr = {
627                         .max_wr  = context->srqSize,
628                         .max_sge = 1
629                         }
630                 };
631                 context->srq = ibv_create_srq(context->pd,&srqAttr);
632                 CmiAssert(context->srq != NULL);
633         
634                 struct ibv_qp_init_attr initAttr = {
635                         .qp_type = IBV_QPT_RC,
636                         .send_cq = context->sendCq,
637                         .recv_cq = context->recvCq,
638                         .srq             = context->srq,
639                         .sq_sig_all = 0,
640                         .qp_context = NULL,
641                         .cap     = {
642                                 .max_send_wr  = maxTokens,
643                                 .max_send_sge = 2,
644                         },
645                 };
646                 struct ibv_qp_attr attr;
647
648                 attr.qp_state        = IBV_QPS_INIT;
649                 attr.pkey_index      = 0;
650                 attr.port_num        = ibPort;
651                 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
652
653 /*              MACHSTATE1(3,"context->pd %p",context->pd);
654                 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
655                 MACHSTATE1(3,"TEST QP %p",qp);*/
656
657                 for( i=0;i<numNodes;i++){
658                         if(i == myNode){
659                         }else{
660                                 localAddr[i].lid = myLid;
661                                 context->qp[i] = ibv_create_qp(context->pd,&initAttr);
662                         
663                                 MACHSTATE2(3,"qp[%d] created %p",i,context->qp[i]);
664                         
665                                 CmiAssert(context->qp[i] != NULL);
666                         
667                         
668                                 ibv_modify_qp(context->qp[i], &attr,
669                                           IBV_QP_STATE              |
670                                           IBV_QP_PKEY_INDEX         |
671                                         IBV_QP_PORT               |
672                                         IBV_QP_ACCESS_FLAGS);           
673
674                                 localAddr[i].qpn = context->qp[i]->qp_num;
675                                 localAddr[i].psn = lrand48() & 0xffffff;
676                                 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",i,localAddr[i].lid,localAddr[i].qpn,localAddr[i].psn);
677                         }
678                 }
679         }
680         MACHSTATE(3,"qps created");
681 };
682
683 void copyInfiAddr(ChInfiAddr *qpList){
684         int qpListIdx=0;
685         int i;
686         MACHSTATE1(3,"copyInfiAddr _Cmi_mynode %d",_Cmi_mynode);
687         for(i=0;i<_Cmi_numnodes;i++){
688                 if(i == _Cmi_mynode){
689                 }else{
690                         qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
691                         qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
692                         qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);                    
693                         qpListIdx++;
694                 }
695         }
696 }
697
698
699 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
700         struct ibv_port_attr attr;
701
702         if (ibv_query_port(dev_context, port, &attr))
703                 return 0;
704
705         return attr.lid;
706 }
707
708 /**************** END OF CmiMachineInit and its helper functions*/
709
710 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
711 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
712
713 /* Initial the infiniband specific data for a remote node
714         1. connect the qp and store it in and return it
715 **/
716 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
717         struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
718         int err;
719         ret->state = INFI_HEADER_DATA;
720         ret->qp = context->qp[node];
721 //      ret->totalTokens = tokensPerProcessor;
722 //      ret->tokensLeft = tokensPerProcessor;
723         ret->nodeNo = node;
724 //      ret->postedRecvs = tokensPerProcessor;
725 #if     CMK_IBVERBS_DEBUG       
726         ret->psn = 0;
727         ret->recvPsn = 0;
728 #endif
729         
730         struct ibv_qp_attr attr = {
731                 .qp_state               = IBV_QPS_RTR,
732                 .path_mtu               = mtu,
733                 .dest_qp_num            = addr[1],
734                 .rq_psn                 = addr[2],
735                 .max_dest_rd_atomic     = 1,
736                 .min_rnr_timer          = 31,
737                 .ah_attr                = {
738                         .is_global      = 0,
739                         .dlid           = addr[0],
740                         .sl             = 0,
741                         .src_path_bits  = 0,
742                         .port_num       = context->ibPort
743                 }
744         };
745         
746         MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
747         MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
748         
749         if (err = ibv_modify_qp(ret->qp, &attr,
750           IBV_QP_STATE              |
751           IBV_QP_AV                 |
752           IBV_QP_PATH_MTU           |
753           IBV_QP_DEST_QPN           |
754           IBV_QP_RQ_PSN             |
755           IBV_QP_MAX_DEST_RD_ATOMIC |
756           IBV_QP_MIN_RNR_TIMER)) {
757                         MACHSTATE1(3,"ERROR %d",err);
758                         CmiAbort("failed to change qp state to RTR");
759         }
760
761         MACHSTATE(3,"qp state changed to RTR");
762         
763         attr.qp_state       = IBV_QPS_RTS;
764         attr.timeout        = 26;
765         attr.retry_cnt      = 20;
766         attr.rnr_retry      = 7;
767         attr.sq_psn         = context->localAddr[node].psn;
768         attr.max_rd_atomic  = 1;
769
770         
771         if (ibv_modify_qp(ret->qp, &attr,
772           IBV_QP_STATE              |
773           IBV_QP_TIMEOUT            |
774           IBV_QP_RETRY_CNT          |
775           IBV_QP_RNR_RETRY          |
776           IBV_QP_SQ_PSN             |
777           IBV_QP_MAX_QP_RD_ATOMIC)) {
778                         fprintf(stderr, "Failed to modify QP to RTS\n");
779                         exit(1);
780         }
781         MACHSTATE(3,"qp state changed to RTS");
782
783         MACHSTATE(3,"} initInfiOtherNodeData");
784         return ret;
785 }
786
787
788 void    infiPostInitialRecvs(){
789         //create the pool and post the receives
790         int numPosts;
791 /*      if(tokensPerProcessor*(_Cmi_numnodes-1) <= maxRecvBuffers){
792                 numPosts = tokensPerProcessor*(_Cmi_numnodes-1);
793         }else{
794                 numPosts = maxRecvBuffers;
795         }*/
796
797         if(_Cmi_numnodes > 1){
798                 numPosts = maxRecvBuffers;
799         }else{
800                 numPosts = 0;
801         }
802         if(numPosts > 0){
803                 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
804                 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
805         }
806
807
808         free(context->qp);
809         context->qp = NULL;
810         free(context->localAddr);
811         context->localAddr= NULL;
812 }
813
814 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
815         int numBuffers;
816         int i;
817         int bigSize;
818         char *bigBuf;
819         struct infiBufferPool *ret;
820         struct ibv_mr *bigKey;
821
822         MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
823
824         page_size = sysconf(_SC_PAGESIZE);
825         ret = malloc(sizeof(struct infiBufferPool));
826         ret->next = NULL;
827         numBuffers=ret->numBuffers = numRecvs;
828         
829         ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
830         
831         bigSize = numBuffers*sizePerBuffer;
832         bigBuf=malloc(bigSize);
833         bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
834         CmiAssert(bigKey != NULL);
835         
836         for(i=0;i<numBuffers;i++){
837                 struct infiBuffer *buffer =  &(ret->buffers[i]);
838                 buffer->type = BUFFER_RECV;
839                 buffer->size = sizePerBuffer;
840                 
841                 /*buffer->buf = malloc(sizePerBuffer);
842                 buffer->key = ibv_reg_mr(context->pd,buffer->buf,buffer->size,IBV_ACCESS_LOCAL_WRITE);*/
843                 buffer->buf = &bigBuf[i*sizePerBuffer];
844                 buffer->key = bigKey;
845
846                 if(buffer->key == NULL){
847                         MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
848                         CmiAssert(buffer->key != NULL);
849                 }
850         }
851         return ret;
852 };
853
854
855
856 /**
857          Post the buffers as recv work requests
858 */
859 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
860         int j,err;
861         struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
862         struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
863         struct ibv_recv_wr *bad_wr;
864         
865         int startBufferIdx=0;
866         MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
867         for(j=0;j<numRecvs;j++){
868                 
869                 
870                 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
871                 sgElements[j].length = sizePerBuffer;
872                 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
873                 
874                 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
875                 workRequests[j].sg_list = &sgElements[j];
876                 workRequests[j].num_sge = 1;
877                 if(j != numRecvs-1){
878                         workRequests[j].next = &workRequests[j+1];
879                 }
880                 
881         }
882         workRequests[numRecvs-1].next = NULL;
883         MACHSTATE(3,"About to call ibv_post_srq_recv");
884         if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
885                 CmiAssert(0);
886         }
887
888         free(workRequests);
889         free(sgElements);
890 }
891
892
893
894
895 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
896
897 static void CmiMachineExit()
898 {
899 #if CMK_IBVERBS_STATS   
900         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
901 #endif
902 }
903 static void ServiceCharmrun_nolock();
904
905 static void CmiNotifyStillIdle(CmiIdleState *s) {
906 #if CMK_SMP
907         CmiCommLock();
908 /*      if(where == COMM_SERVER_FROM_SMP)*/
909 #endif
910 /*              ServiceCharmrun_nolock();*/
911
912         CommunicationServer_nolock(0);
913 #if CMK_SMP
914         CmiCommUnlock();
915 #endif
916 }
917
918 static inline void increaseTokens(OtherNode node);
919
920 static inline int pollRecvCq(const int toBuffer);
921 static inline int pollSendCq(const int toBuffer);
922
923
924 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
925 #if !CMK_IBVERBS_TOKENS_FLOW
926         return;
927 #else
928         //if(infiData->tokensLeft == 0){
929         if(context->tokensLeft == 0){
930                 MACHSTATE(3,"GET FREE TOKENS {{{");
931         }else{
932                 return;
933         }
934         while(context->tokensLeft == 0){
935                 CommunicationServer_nolock(1); 
936         }
937         MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
938 #endif
939 }
940
941
942 /**
943         Packetize this data and send it
944 **/
945
946
947
948 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
949         int incTokens=0;
950         int retval;
951 #if     CMK_IBVERBS_DEBUG
952         packet->header.psn = (++node->infiData->psn);
953 #endif  
954
955
956
957         packet->elemList[1].addr = (uintptr_t)packet->buf;
958         packet->elemList[1].length = size;
959         packet->elemList[1].lkey = dataKey->lkey;
960         
961         
962         packet->destNode = node;
963         
964 #if CMK_IBVERBS_STATS   
965         pktCount++;
966 #endif  
967         
968         getFreeTokens(node->infiData);
969
970 #if CMK_IBVERBS_INCTOKENS       
971         if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
972                 packet->header.code |= INFIPACKETCODE_INCTOKENS;
973                 incTokens=1;
974         }
975 #endif
976 /*
977         if(!checkQp(node->infiData->qp)){
978                 pollSendCq(1);
979                 CmiAssert(0);
980         }*/
981
982         struct ibv_send_wr *bad_wr=NULL;
983         if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
984                 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
985                 CmiAssert(0);
986         }
987 #if     CMK_IBVERBS_TOKENS_FLOW
988         context->tokensLeft--;
989 #if     CMK_IBVERBS_STATS
990         if(context->tokensLeft < minTokensLeft){
991                 minTokensLeft = context->tokensLeft;
992         }
993 #endif
994 #endif
995
996 /*      if(!checkQp(node->infiData->qp)){
997                 pollSendCq(1);
998                 CmiAssert(0);
999         }*/
1000
1001 #if CMK_IBVERBS_INCTOKENS       
1002         if(incTokens){
1003                 increaseTokens(node);
1004         }
1005 #endif
1006
1007
1008 #if     CMK_IBVERBS_DEBUG
1009         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1010 #else
1011         MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1012 #endif
1013
1014 };
1015
1016
1017 static void inline EnqueueDummyPacket(OtherNode node,int size){
1018         infiPacket packet;
1019         MallocInfiPacket(packet);
1020         packet->size = size;
1021         packet->buf = CmiAlloc(size);
1022         
1023         packet->header.code = INFIDUMMYPACKET;
1024
1025         struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1026         
1027         MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1028         EnqueuePacket(node,packet,size,key);
1029 }
1030
1031
1032
1033
1034
1035
1036 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1037         infiPacket packet;
1038         MallocInfiPacket(packet);
1039         packet->size = size;
1040         packet->buf=data;
1041         
1042         //the nodeNo is added at time of packet allocation
1043         packet->header.code = INFIPACKETCODE_DATA;
1044         
1045         ogm->refcount++;
1046         packet->ogm = ogm;
1047         
1048         struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1049         CmiAssert(key != NULL);
1050         
1051         EnqueuePacket(node,packet,size,key);
1052 };
1053
1054 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1055 static inline void processAllBufferedMsgs();
1056
1057 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1058         int size; char *data;
1059 //      processAllBufferedMsgs();
1060
1061         
1062         ogm->refcount++;
1063   size = ogm->size;
1064   data = ogm->data;
1065
1066 #if CMK_IBVERBS_STATS   
1067         msgCount++;
1068 #endif
1069
1070         MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1071         //First packet has dgram header, other packets dont
1072         
1073   DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1074         
1075         CmiMsgHeaderSetLength(ogm->data,ogm->size);
1076
1077         if(rdma && size > rdmaThreshold){
1078                         EnqueueRdmaPacket(ogm,node);
1079         }else{
1080         
1081                 while(size > dataSize){
1082                         EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1083                         size -= dataSize;
1084                         data += dataSize;
1085                 }
1086                 if(size > 0){
1087                         EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1088                 }
1089         }
1090 //#if   !CMK_SMP
1091         processAllBufferedMsgs();
1092 //#endif
1093         ogm->refcount--;
1094         MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1095 }
1096
1097
1098 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1099         infiPacket packet;
1100
1101         ogm->refcount++;
1102         
1103         MallocInfiPacket(packet);
1104  
1105  {
1106                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1107
1108                 
1109                 packet->size = sizeof(struct infiRdmaPacket);
1110                 packet->buf = (char *)rdmaPacket;
1111                 
1112 /*              struct ibv_mr *key = ibv_reg_mr(context->pd,ogm->data,ogm->size,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE);*/
1113                 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1114                 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1115                 
1116                 packet->header.code = INFIRDMA_START;
1117                 packet->header.nodeNo = _Cmi_mynode;
1118                 packet->ogm = NULL;
1119
1120                 rdmaPacket->type = INFI_MESG;
1121                 rdmaPacket->ogm = ogm;
1122                 rdmaPacket->key = *key;
1123                 rdmaPacket->keyPtr = key;
1124                 rdmaPacket->remoteBuf = ogm->data;
1125                 rdmaPacket->remoteSize = ogm->size;
1126                 
1127                 
1128                 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1129                 
1130                 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1131                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1132         }
1133 }
1134
1135
1136
1137 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1138 static inline void processSendWC(struct ibv_wc *sendWC);
1139 static unsigned int _count=0;
1140 extern int errno;
1141 static int _countAsync=0;
1142 static inline void processAsyncEvents(){
1143         struct ibv_async_event event;
1144         int ready;
1145         _countAsync++;
1146         if(_countAsync < 1){
1147                 return;
1148         }
1149         _countAsync=0;
1150         FD_SET(context->context->async_fd,&context->asyncFds);
1151         CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1152         ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1153         
1154         if(ready==0){
1155                 return;
1156         }
1157         if(ready == -1){
1158 //              printf("[%d] strerror %s \n",_Cmi_mynode,strerror(errno));
1159                 return;
1160         }
1161         
1162         if (ibv_get_async_event(context->context, &event)){
1163                 return;
1164                 CmiAbort("get async event failed");
1165         }
1166         printf("[%d] async event %d \n",_Cmi_mynode, event.event_type);
1167         ibv_ack_async_event(&event);
1168
1169         
1170 }
1171
1172 static void pollCmiDirectQ();
1173
1174 static inline  void CommunicationServer_nolock(int toBuffer) {
1175         int processed;
1176         if(_Cmi_numnodes <= 1){
1177                 pollCmiDirectQ();
1178                 return;
1179         }
1180         MACHSTATE(2,"CommServer_nolock{");
1181         
1182 //      processAsyncEvents();
1183         
1184 //      checkAllQps();
1185
1186         pollCmiDirectQ();
1187
1188         processed = pollRecvCq(toBuffer);
1189         
1190
1191         processed += pollSendCq(toBuffer);
1192         
1193         if(toBuffer == 0){
1194 //              if(processed != 0)
1195                         processAllBufferedMsgs();
1196         }
1197         
1198 //      checkAllQps();
1199 //      _count--;
1200         MACHSTATE(2,"} CommServer_nolock ne");
1201         
1202 }
1203 /*
1204 static inline infiBufferedWC createInfiBufferedWC(){
1205         infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1206         ret->count = 0;
1207         ret->next = ret->prev =NULL;
1208         return ret;
1209 }*/
1210
1211 /****
1212         The buffered recvWC are stored in a doubly linked list of 
1213         arrays or blocks of wcs.
1214         To keep the average insert cost low, a new block is added 
1215         to the top of the list. (resulting in a reverse seq of blocks)
1216         Within a block however wc are stored in a sequence
1217 *****/
1218 /*static  void insertBufferedRecv(struct ibv_wc *wc){
1219         infiBufferedWC block;
1220         MACHSTATE(3,"Insert Buffered Recv called");
1221         if( context->infiBufferedRecvList ==NULL){
1222                 context->infiBufferedRecvList = createInfiBufferedWC();
1223                 block = context->infiBufferedRecvList;
1224         }else{
1225                 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1226                         block = createInfiBufferedWC();
1227                         context->infiBufferedRecvList->prev = block;
1228                         block->next = context->infiBufferedRecvList;
1229                         context->infiBufferedRecvList = block;
1230                 }else{
1231                         block = context->infiBufferedRecvList;
1232                 }
1233         }
1234         
1235         block->wcList[block->count] = *wc;
1236         block->count++;
1237 };
1238 */
1239
1240
1241 /********
1242 go through the blocks of bufferedWC. Process the last block first.
1243 Then the next one and so on. (Processing within a block should happen
1244 in sequence).
1245 Leave the last block in place to avoid having to allocate again
1246 ******/
1247 /*static inline void processBufferedRecvList(){
1248         infiBufferedWC start;
1249         start = context->infiBufferedRecvList;
1250         while(start->next != NULL){
1251                 start = start->next;
1252         }
1253         while(start != NULL){
1254                 int i=0;
1255                 infiBufferedWC tmp;
1256                 for(i=0;i<start->count;i++){
1257                         processRecvWC(&start->wcList[i]);
1258                 }
1259                 if(start != context->infiBufferedRecvList){
1260                         //not the first one
1261                         tmp = start;
1262                         start = start->prev;
1263                         free(tmp);
1264                         start->next = NULL;
1265                 }else{
1266                         start = start->prev;
1267                 }
1268         }
1269         context->infiBufferedRecvList->next = NULL;
1270         context->infiBufferedRecvList->prev = NULL;
1271         context->infiBufferedRecvList->count = 0;
1272 }
1273 */
1274
1275 static inline int pollRecvCq(const int toBuffer){
1276         int i;
1277         int ne;
1278         struct ibv_wc wc[WC_LIST_SIZE];
1279         
1280         MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1281         ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1282 //      CmiAssert(ne >=0);
1283         
1284         if(ne != 0){
1285                 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1286         }
1287         
1288         for(i=0;i<ne;i++){
1289                 if(wc[i].status != IBV_WC_SUCCESS){
1290                         CmiAssert(0);
1291                 }
1292                 switch(wc[i].opcode){
1293                         case IBV_WC_RECV:
1294                                         processRecvWC(&wc[i],toBuffer);
1295                                 break;
1296                         default:
1297                                 CmiAbort("Wrong type of work completion object in recvq");
1298                                 break;
1299                 }
1300                         
1301         }
1302         MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1303         return ne;
1304
1305 }
1306
1307 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1308
1309 static inline int pollSendCq(const int toBuffer){
1310         int i;
1311         int ne;
1312         struct ibv_wc wc[WC_LIST_SIZE];
1313
1314         ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1315 //      CmiAssert(ne >=0);
1316         
1317         
1318         for(i=0;i<ne;i++){
1319                 if(wc[i].status != IBV_WC_SUCCESS){
1320                         printf("[%d] wc[%d] status %d wc[i].opcode %d\n",_Cmi_mynode,i,wc[i].status,wc[i].opcode);
1321 #if CMK_IBVERBS_STATS
1322         printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",_Cmi_mynode,msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1323 #endif
1324                         CmiAssert(0);
1325                 }
1326                 switch(wc[i].opcode){
1327                         case IBV_WC_SEND:{
1328                                 //message received
1329                                 processSendWC(&wc[i]);
1330                                 
1331                                 break;
1332                                 }
1333                         case IBV_WC_RDMA_READ:
1334                         {
1335 //                              processRdmaWC(&wc[i],toBuffer);
1336                                         processRdmaWC(&wc[i],1);
1337                                 break;
1338                         }
1339                         case IBV_WC_RDMA_WRITE:
1340                         {
1341                                 /*** used for CmiDirect puts 
1342                                 Nothing needs to be done on the sender side once send is done **/
1343                                 break;
1344                         }
1345                         default:
1346                                 CmiAbort("Wrong type of work completion object in recvq");
1347                                 break;
1348                 }
1349                         
1350         }
1351         return ne;
1352 }
1353
1354
1355 /******************
1356 Check the communication server socket and
1357
1358 *****************/
1359 int CheckSocketsReady(int withDelayMs)
1360 {   
1361   int nreadable;
1362   CMK_PIPE_DECL(withDelayMs);
1363
1364   CmiStdoutAdd(CMK_PIPE_SUB);
1365   if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1366
1367   nreadable=CMK_PIPE_CALL();
1368   ctrlskt_ready_read = 0;
1369   dataskt_ready_read = 0;
1370   dataskt_ready_write = 0;
1371   
1372   if (nreadable == 0) {
1373     MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1374     return nreadable;
1375   }
1376   if (nreadable==-1) {
1377     CMK_PIPE_CHECKERR();
1378     MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1379     return CheckSocketsReady(0);
1380   }
1381   CmiStdoutCheck(CMK_PIPE_SUB);
1382   if (Cmi_charmrun_fd!=-1) 
1383           ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1384   MACHSTATE(1,"} CheckSocketsReady")
1385   return nreadable;
1386 }
1387
1388
1389 /*** Service the charmrun socket
1390 *************/
1391
1392 static void ServiceCharmrun_nolock()
1393 {
1394   int again = 1;
1395   MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1396   while (again)
1397   {
1398   again = 0;
1399   CheckSocketsReady(0);
1400   if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1401   if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1402   }
1403   MACHSTATE(2,"} ServiceCharmrun_nolock end")
1404 }
1405
1406
1407
1408 static void CommunicationServer(int sleepTime, int where){
1409         if( where == COMM_SERVER_FROM_INTERRUPT){
1410                 return;
1411         }
1412 #if CMK_SMP
1413         if(where == COMM_SERVER_FROM_WORKER){
1414                 return;
1415         }
1416         if(where == COMM_SERVER_FROM_SMP){
1417 #endif
1418                 ServiceCharmrun_nolock();
1419 #if CMK_SMP
1420         }
1421         CmiCommLock();
1422 #endif
1423         CommunicationServer_nolock(0);
1424 #if CMK_SMP
1425         CmiCommUnlock();
1426 #endif
1427 }
1428
1429
1430 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1431
1432
1433 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1434
1435 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1436         char *newmsg;
1437         
1438         MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1439         
1440         OtherNode node = &nodes[nodeNo];
1441         newmsg = node->asm_msg;         
1442         
1443         /// This simple state machine determines if this packet marks the beginning of a new message
1444         // from another node, or if this is another in a sequence of packets
1445         switch(node->infiData->state){
1446                 case INFI_HEADER_DATA:
1447                 {
1448                         int size;
1449                         int rank, srcpe, seqno, magic, i;
1450                         unsigned int broot;
1451                         DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1452                         size = CmiMsgHeaderGetLength(msg);
1453                         MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1454 //                      CmiAssert(size > 0);
1455 //                      CmiAssert(nodes_by_pe[srcpe] == node);
1456                         
1457 //                      CmiAssert(newmsg == NULL);
1458                         if(len > size){
1459                                 CmiPrintf("size: %d, len:%d.\n", size, len);
1460                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1461                         }
1462                         newmsg = (char *)CmiAlloc(size);
1463       _MEMCHECK(newmsg);
1464       memcpy(newmsg, msg, len);
1465       node->asm_rank = rank;
1466       node->asm_total = size;
1467       node->asm_fill = len;
1468       node->asm_msg = newmsg;
1469                         node->infiData->broot = broot;
1470                         if(len == size){
1471                                 //this is the only packet for this message 
1472                                 node->infiData->state = INFI_HEADER_DATA;
1473                         }else{
1474                                 //there are more packets following
1475                                 node->infiData->state = INFI_DATA;
1476                         }
1477                         break;
1478                 }
1479                 case INFI_DATA:
1480                 {
1481                         if(node->asm_fill + len < node->asm_total && len != dataSize){
1482                                 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1483                                 CmiAbort("packet in the middle does not have expected length");
1484                         }
1485                         if(node->asm_fill+len > node->asm_total){
1486                                 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1487                                 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1488                         }
1489                         //tODO: remove this
1490                         memcpy(newmsg + node->asm_fill,msg,len);
1491                         node->asm_fill += len;
1492                         if(node->asm_fill == node->asm_total){
1493                                 node->infiData->state = INFI_HEADER_DATA;
1494                         }else{
1495                                 node->infiData->state = INFI_DATA;
1496                         }
1497                         break;
1498                 }
1499         }
1500         /// if this packet was the last packet in a message ie state was 
1501         /// reset to infi_header_data
1502         
1503         if(node->infiData->state == INFI_HEADER_DATA){
1504                 int total_size = node->asm_total;
1505                 node->asm_msg = NULL;
1506                 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1507                 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1508         }
1509         
1510 };
1511
1512 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1513 #if CMK_BROADCAST_SPANNING_TREE
1514         if (rank == DGRAM_BROADCAST
1515 #if CMK_NODE_QUEUE_AVAILABLE
1516           || rank == DGRAM_NODEBROADCAST
1517 #endif
1518          ){
1519                                          if(toBuffer){
1520                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1521                                                 }else{
1522                         SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
1523                                                 }
1524                                         }
1525 #elif CMK_BROADCAST_HYPERCUBE
1526         if (rank == DGRAM_BROADCAST
1527 #if CMK_NODE_QUEUE_AVAILABLE
1528           || rank == DGRAM_NODEBROADCAST
1529 #endif
1530          ){
1531                                          if(toBuffer){
1532                                                         insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
1533                                          }else{
1534                         SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
1535                                                 }
1536                                         }
1537 #endif
1538
1539
1540                 
1541                 switch (rank) {
1542         case DGRAM_BROADCAST: {
1543                                 int i;
1544                                 for (i=1; i<_Cmi_mynodesize; i++){
1545                                         CmiPushPE(i, CopyMsg(newmsg, total_size));
1546                                 }
1547           CmiPushPE(0, newmsg);
1548           break;
1549       }
1550 #if CMK_NODE_QUEUE_AVAILABLE
1551         case DGRAM_NODEBROADCAST: 
1552         case DGRAM_NODEMESSAGE: {
1553           CmiPushNode(newmsg);
1554           break;
1555         }
1556 #endif
1557         default:
1558                                 {
1559                                         
1560           CmiPushPE(rank, newmsg);
1561                                 }
1562         }    /* end of switch */
1563                 if(!toBuffer){
1564 //#if !CMK_SMP          
1565                 processAllBufferedMsgs();
1566 //#endif
1567                 }
1568 }
1569
1570
1571 static inline void increasePostedRecvs(int nodeNo);
1572 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1573 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1574
1575 //struct infiDirectRequestPacket;
1576 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1577
1578 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1579         struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;        
1580         struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1581         int nodeNo = header->nodeNo;
1582 #if     CMK_IBVERBS_DEBUG
1583         OtherNode node = &nodes[nodeNo];
1584 #endif
1585         
1586         int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1587 #if     CMK_IBVERBS_DEBUG
1588         if(node->infiData->recvPsn == 0){
1589                 node->infiData->recvPsn = header->psn;
1590         }else{
1591                 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1592                 node->infiData->recvPsn++;
1593         }
1594         MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1595 #else
1596         MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);  
1597 #endif
1598         
1599         if(header->code & INFIPACKETCODE_DATA){
1600                         
1601                         processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1602         }
1603         if(header->code & INFIDUMMYPACKET){
1604                 MACHSTATE(3,"Dummy packet");
1605         }
1606         if(header->code & INFIBARRIERPACKET){
1607                 MACHSTATE(3,"Barrier packet");
1608                 CmiAbort("Should not receive Barrier packet in normal polling loop.  Your Barrier is broken");
1609         }
1610
1611 #if CMK_IBVERBS_INCTOKENS       
1612         if(header->code & INFIPACKETCODE_INCTOKENS){
1613                 increasePostedRecvs(nodeNo);
1614         }
1615 #endif  
1616         if(rdma && header->code & INFIRDMA_START){
1617                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1618 //              if(toBuffer){
1619                         //TODO: make a function of this and use for both acks and requests
1620                         struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1621                         struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1622                         *copyPacket = *rdmaPacket;
1623                         copyPacket->fromNodeNo = nodeNo;
1624                         MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1625                         context->bufferedRdmaRequests = copyPacket;
1626                         copyPacket->next = tmp;
1627                         copyPacket->prev = NULL;
1628                         if(tmp != NULL){
1629                                 tmp->prev = copyPacket;
1630                         }
1631 /*              }else{
1632                         processRdmaRequest(rdmaPacket,nodeNo,0);
1633                 }*/
1634         }
1635         if(rdma && header->code & INFIRDMA_ACK){
1636                 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1637                 processRdmaAck(rdmaPacket);
1638         }
1639 /*      if(header->code & INFIDIRECT_REQUEST){
1640                 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1641                 processDirectRequest(directRequestPacket);
1642         }*/
1643         {
1644                 struct ibv_sge list = {
1645                         .addr   = (uintptr_t) buffer->buf,
1646                         .length = buffer->size,
1647                         .lkey   = buffer->key->lkey
1648                 };
1649         
1650                 struct ibv_recv_wr wr = {
1651                         .wr_id = (uint64_t)buffer,
1652                         .sg_list = &list,
1653                         .num_sge = 1,
1654                         .next = NULL
1655                 };
1656                 struct ibv_recv_wr *bad_wr;
1657         
1658                 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1659                         CmiAssert(0);
1660                 }
1661         }
1662
1663 };
1664
1665
1666
1667
1668 static inline  void processSendWC(struct ibv_wc *sendWC){
1669
1670         infiPacket packet = (infiPacket )sendWC->wr_id;
1671 #if CMK_IBVERBS_TOKENS_FLOW
1672 //      packet->destNode->infiData->tokensLeft++;
1673         context->tokensLeft++;
1674 #endif
1675
1676         MACHSTATE2(3,"Packet send complete node %d  tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1677         if(packet->ogm != NULL){
1678                 packet->ogm->refcount--;
1679                 if(packet->ogm->refcount == 0){
1680                         GarbageCollectMsg(packet->ogm); 
1681                 }
1682         }else{
1683                 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code ==  INFIDUMMYPACKET){
1684
1685                 }
1686         }
1687
1688         FreeInfiPacket(packet);
1689 };
1690
1691
1692
1693 /********************************************************************/
1694 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1695         int nodeNo = fromNodeNo;
1696         OtherNode node = &nodes[nodeNo];
1697         struct infiRdmaPacket *rdmaPacket;
1698
1699         getFreeTokens(node->infiData);
1700 #if CMK_IBVERBS_TOKENS_FLOW
1701 //      node->infiData->tokensLeft--;
1702         context->tokensLeft--;
1703 #if     CMK_IBVERBS_STATS
1704         if(context->tokensLeft < minTokensLeft){
1705                 minTokensLeft = context->tokensLeft;
1706         }
1707 #endif
1708 #endif
1709         
1710         struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1711 //      CmiAssert(buffer != NULL);
1712         
1713         
1714         if(isBuffered){
1715                 rdmaPacket = _rdmaPacket;
1716         }else{
1717                 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1718                 *rdmaPacket = *_rdmaPacket;
1719         }
1720
1721
1722         rdmaPacket->fromNodeNo = fromNodeNo;
1723         rdmaPacket->localBuffer = (void *)buffer;
1724         
1725         buffer->type = BUFFER_RDMA;
1726         buffer->size = rdmaPacket->remoteSize;
1727         
1728         buffer->buf  = (char *)CmiAlloc(rdmaPacket->remoteSize);
1729 //      CmiAssert(buffer->buf != NULL);
1730
1731         buffer->key = METADATAFIELD(buffer->buf)->key;
1732
1733         
1734         MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1735         MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1736 //      CmiAssert(buffer->key != NULL);
1737         
1738         {
1739                 struct ibv_sge list = {
1740                         .addr = (uintptr_t )buffer->buf,
1741                         .length = buffer->size,
1742                         .lkey   = buffer->key->lkey
1743                 };
1744
1745                 struct ibv_send_wr *bad_wr;
1746                 struct ibv_send_wr wr = {
1747                         .wr_id = (uint64_t )rdmaPacket,
1748                         .sg_list = &list,
1749                         .num_sge = 1,
1750                         .opcode = IBV_WR_RDMA_READ,
1751                         .send_flags = IBV_SEND_SIGNALED,
1752                         .wr.rdma = {
1753                                 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1754                                 .rkey = rdmaPacket->key.rkey
1755                         }
1756                 };
1757                 /** post and rdma_read that is a rdma get*/
1758                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1759                         CmiAssert(0);
1760                 }
1761         }
1762
1763 };
1764
1765 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1766 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1767
1768 static inline  void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1769                 //rdma get done
1770 #if CMK_IBVERBS_STATS
1771         double _startRegTime;
1772 #endif  
1773
1774         struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1775 /*      if(rdmaPacket->type == INFI_DIRECT){
1776                 processDirectWC(rdmaPacket);
1777                 return;
1778         }*/
1779 //      CmiAssert(rdmaPacket->type == INFI_MESG);
1780         struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1781
1782         /*TODO: remove this
1783         memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1784         
1785 /*      CmiAssert(buffer->type == BUFFER_RDMA);
1786         CmiAssert(rdmaWC->byte_len == buffer->size);*/
1787         
1788         {
1789                 int size;
1790                 int rank, srcpe, seqno, magic, i;
1791                 unsigned int broot;
1792                 char *msg = buffer->buf;
1793                 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1794                 size = CmiMsgHeaderGetLength(msg);
1795 /*              CmiAssert(size == buffer->size);*/
1796                 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1797         }
1798         MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1799
1800         
1801         free(buffer);
1802         
1803         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1804         //we are sending this ack as a response to a successful
1805         // rdma_Read.. the token for that rdma_Read needs to be freed
1806 #if CMK_IBVERBS_TOKENS_FLOW     
1807         //node->infiData->tokensLeft++;
1808         context->tokensLeft++;
1809 #endif
1810
1811         //send ack to sender if toBuffer is off otherwise buffer it
1812         if(toBuffer){
1813                 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1814                 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1815                 context->bufferedRdmaAcks = rdmaPacket;
1816                 rdmaPacket->next = tmp;
1817                 rdmaPacket->prev = NULL;
1818                 if(tmp != NULL){
1819                         tmp->prev = rdmaPacket; 
1820                 }
1821         }else{
1822                 EnqueueRdmaAck(rdmaPacket);             
1823                 free(rdmaPacket);
1824         }
1825 }
1826
1827 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1828         infiPacket packet;
1829         OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1830
1831         
1832         MallocInfiPacket(packet);
1833         {
1834                 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1835                 *ackPacket = *rdmaPacket;
1836                 packet->size = sizeof(struct infiRdmaPacket);
1837                 packet->buf = (char *)ackPacket;
1838                 packet->header.code = INFIRDMA_ACK;
1839                 packet->ogm=NULL;
1840                 
1841                 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1842         
1843
1844                 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1845         }
1846 };
1847
1848
1849 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1850         MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1851         rdmaPacket->ogm->refcount--;
1852         GarbageCollectMsg(rdmaPacket->ogm);
1853 }
1854
1855
1856 /****************************
1857  Deal with all the buffered (delayed) messages
1858  such as processing recvd broadcasts, sending
1859  rdma acks and processing recvd rdma requests
1860 ******************************/
1861
1862
1863 static inline infiBufferedBcastPool createBcastPool(){
1864         int i;
1865         infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1866         ret->count = 0;
1867         ret->next = ret->prev = NULL;   
1868         for(i=0;i<BCASTLIST_SIZE;i++){
1869                 ret->bcastList[i].valid = 0;
1870         }
1871         return ret;
1872 };
1873 /****
1874         The buffered bcast messages are stored in a doubly linked list of 
1875         arrays or blocks.
1876         To keep the average insert cost low, a new block is added 
1877         to the top of the list. (resulting in a reverse seq of blocks)
1878         Within a block however bcast are stored in increasing order sequence
1879 *****/
1880
1881 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1882         if(context->bufferedBcastList == NULL){
1883                 context->bufferedBcastList = createBcastPool();
1884         }else{
1885                 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1886                         infiBufferedBcastPool tmp;
1887                         tmp = createBcastPool();
1888                         context->bufferedBcastList->prev = tmp;
1889                         tmp->next = context->bufferedBcastList;
1890                         context->bufferedBcastList = tmp;
1891                 }
1892         }
1893         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1894         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1895         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1896         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1897         context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1898         
1899         MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1900         
1901         context->bufferedBcastList->count++;
1902 }
1903
1904 /*********
1905         Go through the blocks of buffered bcast messages. process last block first
1906         processign within a block is in sequence though
1907 *********/
1908 static inline void processBufferedBcast(){
1909         infiBufferedBcastPool start;
1910
1911         if(context->bufferedBcastList == NULL){
1912                 return;
1913         }
1914         start = context->bufferedBcastList;
1915         if(context->insideProcessBufferedBcasts==1){
1916                 return;
1917         }
1918         context->insideProcessBufferedBcasts=1;
1919
1920         while(start->next != NULL){
1921                 start = start->next;
1922         }
1923         
1924         while(start != NULL){
1925                 int i=0;
1926                 infiBufferedBcastPool tmp;
1927                 if(start->count != 0){
1928                         MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1929                 }
1930                 for(i=0;i<start->count;i++){
1931                         if(start->bcastList[i].valid == 0){
1932                                 continue;
1933                         }
1934                         start->bcastList[i].valid=0;
1935                         MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1936 #if CMK_BROADCAST_SPANNING_TREE
1937         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1938 #if CMK_NODE_QUEUE_AVAILABLE
1939           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1940 #endif
1941          ){
1942                 SendSpanningChildren(NULL, 0, start->bcastList[i].size,start->bcastList[i].msg, start->bcastList[i].broot,start->bcastList[i].asm_rank);
1943                                         }
1944 #elif CMK_BROADCAST_HYPERCUBE
1945         if (start->bcastList[i].asm_rank == DGRAM_BROADCAST
1946 #if CMK_NODE_QUEUE_AVAILABLE
1947           || start->bcastList[i].asm_rank == DGRAM_NODEBROADCAST
1948 #endif
1949          ){
1950                 SendHypercube(NULL, 0,start->bcastList[i].size,start->bcastList[i].msg ,start->bcastList[i].broot,start->bcastList[i].asm_rank);
1951                                         }
1952 #endif
1953                 }
1954                 if(start->count != 0){
1955                         MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1956                 }
1957                 
1958                 tmp = start;
1959                 start = start->prev;
1960                 free(tmp);
1961                 if(start != NULL){
1962                         //not the first one
1963                         start->next = NULL;
1964                 }
1965         }
1966
1967         context->bufferedBcastList = NULL;
1968 /*      context->bufferedBcastList->prev = NULL;
1969         context->bufferedBcastList->count =0;   */
1970         context->insideProcessBufferedBcasts=0;
1971         MACHSTATE(2,"processBufferedBcast done ");
1972 };
1973
1974
1975 static inline void processBufferedRdmaAcks(){
1976         struct infiRdmaPacket *start = context->bufferedRdmaAcks;
1977         if(start == NULL){
1978                 return;
1979         }
1980         while(start->next != NULL){
1981                 start = start->next;
1982         }
1983         while(start != NULL){
1984                 struct infiRdmaPacket *rdmaPacket=start;
1985                 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
1986                 EnqueueRdmaAck(rdmaPacket);
1987                 start = start->prev;
1988                 free(rdmaPacket);
1989         }
1990         context->bufferedRdmaAcks=NULL;
1991 }
1992
1993
1994
1995 static inline void processBufferedRdmaRequests(){
1996         struct infiRdmaPacket *start = context->bufferedRdmaRequests;
1997         if(start == NULL){
1998                 return;
1999         }
2000         
2001         
2002         while(start->next != NULL){
2003                 start = start->next;
2004         }
2005         while(start != NULL){
2006                 struct infiRdmaPacket *rdmaPacket=start;
2007                 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2008                 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2009                 start = start->prev;
2010         }
2011         
2012         context->bufferedRdmaRequests=NULL;
2013 }
2014
2015
2016
2017
2018
2019 static inline void processAllBufferedMsgs(){
2020 #if CMK_IBVERBS_STATS
2021         double _startTime = CmiWallTimer();
2022         processBufferedCount++;
2023 #endif
2024         processBufferedBcast();
2025
2026         processBufferedRdmaAcks();
2027         processBufferedRdmaRequests();
2028 #if CMK_IBVERBS_STATS
2029         processBufferedTime += (CmiWallTimer()-_startTime);
2030 #endif  
2031 };
2032
2033
2034 /*************************
2035         Increase tokens when short of them
2036 **********/
2037 static inline void increaseTokens(OtherNode node){
2038         int err;
2039         int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2040         if(node->infiData->totalTokens + increase > maxTokens){
2041                 increase = maxTokens-node->infiData->totalTokens;
2042         }
2043         node->infiData->totalTokens += increase;
2044         node->infiData->tokensLeft += increase;
2045         MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2046         //increase the size of the sendCq
2047         int currentCqSize = context->sendCqSize;
2048         if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2049                 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",_Cmi_mynode,increase,currentCqSize, node->infiData->totalTokens);
2050                 CmiAssert(0);
2051         }
2052         context->sendCqSize+= increase;
2053 };
2054
2055 static void increasePostedRecvs(int nodeNo){
2056         OtherNode node = &nodes[nodeNo];
2057         int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;     
2058         int recvIncrease = tokenIncrease;
2059         if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2060                 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2061         }
2062         if(tokenIncrease+context->srqSize > maxRecvBuffers){
2063                 recvIncrease = maxRecvBuffers-context->srqSize;
2064         }
2065         node->infiData->postedRecvs+= recvIncrease;
2066         context->srqSize += recvIncrease;
2067         MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2068         //increase the size of the recvCq
2069         int currentCqSize = context->recvCqSize;
2070         if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2071                 CmiAssert(0);
2072         }
2073         context->recvCqSize += tokenIncrease;
2074         if(recvIncrease > 0){
2075                 //create another bufferPool and attach it to the top of the current one
2076                 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2077                 newPool->next = context->recvBufferPool;
2078                 context->recvBufferPool = newPool;
2079                 postInitialRecvs(newPool,recvIncrease,packetSize);
2080         }
2081
2082 };
2083
2084
2085
2086
2087 /*********************************************
2088         Memory management routines for RDMA
2089
2090 ************************************************/
2091
2092 /**
2093         There are INFINUMPOOLS of memory.
2094         The first pool is of size firstBinSize.
2095         The ith pool is of size firstBinSize*2^i
2096 */
2097
2098 static void initInfiCmiChunkPools(){
2099         int i,j;
2100         int size = firstBinSize;
2101         int nodeSize;
2102
2103 #if THREAD_MULTI_POOL
2104         nodeSize = INFI_NODE_SIZE;
2105         infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2106         for(i = 0; i < nodeSize; i++){
2107                 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2108         }
2109         for(j = 0; j < nodeSize; j++){
2110                 size = firstBinSize;
2111                 for(i=0;i<INFINUMPOOLS;i++){
2112                         infiCmiChunkPools[j][i].size = size;
2113                         infiCmiChunkPools[j][i].startBuf = NULL;
2114                         infiCmiChunkPools[j][i].count = 0;
2115                         size *= 2;
2116                 }
2117         }
2118
2119         // creating the n^2 system of queues
2120         queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2121         for(i = 0; i < nodeSize; i++){
2122                 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2123         }
2124         for(i = 0; i < nodeSize; i++)
2125                 for(j = 0; j < nodeSize; j++)
2126                         queuePool[i][j] = PCQueueCreate();
2127
2128 #else
2129
2130         size = firstBinSize;    
2131         for(i=0;i<INFINUMPOOLS;i++){
2132                 infiCmiChunkPools[i].size = size;
2133                 infiCmiChunkPools[i].startBuf = NULL;
2134                 infiCmiChunkPools[i].count = 0;
2135                 size *= 2;
2136         }
2137 #endif
2138
2139 }
2140
2141 /***********
2142 Register memory for a part of a received multisend message
2143 *************/
2144 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2145         infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2146         char *res=msg-sizeof(infiCmiChunkHeader);
2147         metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2148         metaData->owner = NULL;
2149         metaData->poolIdx = INFIMULTIPOOL;
2150
2151         return metaData;
2152 };
2153
2154
2155 #if THREAD_MULTI_POOL
2156 static inline void *getInfiCmiChunkThread(int dataSize){
2157         //find out to which pool this dataSize belongs to
2158         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2159         int ratio = dataSize/firstBinSize;
2160         int poolIdx=0;
2161         void *res;
2162
2163         //printf("Hi\n");
2164         MACHSTATE1(3,"Rank=%d",CmiMyRank());
2165         
2166         while(ratio > 0){
2167                 ratio  = ratio >> 1;
2168                 poolIdx++;
2169         }
2170         MACHSTATE1(2,"This is %d",CmiMyRank());
2171         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2172         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2173                 infiCmiChunkMetaData *metaData;         
2174                 infiCmiChunkHeader *hdr;
2175                 int allocSize;
2176                 int count=1;
2177                 int i;
2178                 struct ibv_mr *key;
2179                 void *origres;
2180                 
2181                 
2182                 if(poolIdx < INFINUMPOOLS ){
2183                         allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2184                 }else{
2185                         allocSize = dataSize;
2186                 }
2187
2188                 if(poolIdx < blockThreshold){
2189                         count = blockAllocRatio;
2190                 }
2191                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2192                 hdr = res;
2193                 
2194                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2195                 CmiAssert(key != NULL);
2196                 
2197                 origres = (res += sizeof(infiCmiChunkHeader));
2198
2199                 for(i=0;i<count;i++){
2200                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2201                         metaData->key = key;
2202                         metaData->owner = hdr;
2203                         metaData->poolIdx = poolIdx;
2204                         metaData->parentPe = CmiMyRank();                                               // setting the parent PE
2205
2206                         if(i == 0){
2207                                 metaData->owner->metaData->count = count;
2208                                 metaData->nextBuf = NULL;
2209                         }else{
2210                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2211                                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2212                                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2213                                 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2214                                 
2215                         }
2216                         if(i != count-1){
2217                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2218                         }
2219           }     
2220                 
2221                 
2222                 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2223                 
2224                 return origres;
2225         }
2226         if(poolIdx < INFINUMPOOLS){
2227                 infiCmiChunkMetaData *metaData;                         
2228         
2229                 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2230                 res += sizeof(infiCmiChunkHeader);
2231
2232                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2233                 metaData = METADATAFIELD(res);
2234
2235                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2236                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2237                 
2238                 metaData->nextBuf = NULL;
2239 //              CmiAssert(metaData->poolIdx == poolIdx);
2240
2241                 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2242                 return res;
2243         }
2244
2245         CmiAssert(0);
2246
2247         
2248 };
2249 #else
2250 static inline void *getInfiCmiChunk(int dataSize){
2251         //find out to which pool this dataSize belongs to
2252         // poolIdx = floor(log2(dataSize/firstBinSize))+1
2253         int ratio = dataSize/firstBinSize;
2254         int poolIdx=0;
2255         void *res;
2256
2257         while(ratio > 0){
2258                 ratio  = ratio >> 1;
2259                 poolIdx++;
2260         }
2261         MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2262         if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2263                 infiCmiChunkMetaData *metaData;
2264                 infiCmiChunkHeader *hdr;
2265                 int allocSize;
2266                 int count=1;
2267                 int i;
2268                 struct ibv_mr *key;
2269                 void *origres;
2270
2271
2272                 if(poolIdx < INFINUMPOOLS ){
2273                         allocSize = infiCmiChunkPools[poolIdx].size;
2274                 }else{
2275                         allocSize = dataSize;
2276                 }
2277
2278                 if(poolIdx < blockThreshold){
2279                         count = blockAllocRatio;
2280                 }
2281                 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
2282                 hdr = res;
2283
2284                 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2285                 CmiAssert(key != NULL);
2286
2287                 origres = (res += sizeof(infiCmiChunkHeader));
2288
2289                 for(i=0;i<count;i++){
2290                         metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2291                         metaData->key = key;
2292                         metaData->owner = hdr;
2293                         metaData->poolIdx = poolIdx;
2294
2295                         if(i == 0){
2296                                 metaData->owner->metaData->count = count;
2297                                 metaData->nextBuf = NULL;
2298                         }else{
2299                                 void *startBuf = res - sizeof(infiCmiChunkHeader);
2300                                 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2301                                 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2302                                 infiCmiChunkPools[poolIdx].count++;
2303
2304                         }
2305                         if(i != count-1){
2306                                 res += (allocSize+sizeof(infiCmiChunkHeader));
2307                         }
2308           }
2309
2310
2311                 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2312
2313                 return origres;
2314         }
2315         if(poolIdx < INFINUMPOOLS){
2316                 infiCmiChunkMetaData *metaData;
2317
2318                 res = infiCmiChunkPools[poolIdx].startBuf;
2319                 res += sizeof(infiCmiChunkHeader);
2320
2321                 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2322                 metaData = METADATAFIELD(res);
2323
2324                 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2325                 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2326
2327                 metaData->nextBuf = NULL;
2328 //              CmiAssert(metaData->poolIdx == poolIdx);
2329
2330                 infiCmiChunkPools[poolIdx].count--;
2331                 return res;
2332         }
2333
2334         CmiAssert(0);
2335
2336
2337 };
2338 #endif
2339
2340
2341 void * infi_CmiAlloc(int size){
2342         void *res;
2343
2344
2345 #if THREAD_MULTI_POOL
2346         res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2347         res -= sizeof(CmiChunkHeader);
2348         return res;
2349 #else
2350 #if CMK_SMP     
2351         CmiMemLock();
2352 #endif
2353 /*(     if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2354                 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2355
2356                 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));     
2357                 res -= sizeof(CmiChunkHeader);
2358 #if CMK_SMP     
2359         CmiMemUnlock();
2360 #endif
2361 /*      }else{
2362                 res = malloc(size);
2363         }*/
2364         return res;
2365 #endif
2366 }
2367
2368 #if THREAD_MULTI_POOL
2369 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2370 void infi_CmiFreeDirect(void *ptr){
2371         int size;
2372         int parentPe;
2373         void *freePtr = ptr;
2374
2375         //ptr += sizeof(CmiChunkHeader);
2376         size = SIZEFIELD (ptr);
2377 /*      if(size > firstBinSize){*/
2378         infiCmiChunkMetaData *metaData;
2379         int poolIdx;
2380         //there is a infiniband specific header
2381         freePtr = ptr - sizeof(infiCmiChunkHeader);
2382         metaData = METADATAFIELD(ptr);
2383         poolIdx = metaData->poolIdx;
2384
2385         MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2386 //      CmiAssert(poolIdx >= 0);
2387         if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].count <= INFIMAXPERPOOL){
2388                 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2389                 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = freePtr;
2390                         infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2391
2392                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf,infiCmiChunkPools[CmiMyRank()][poolIdx].count);
2393                 }else{
2394                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2395                         metaData->owner->metaData->count--;
2396                         if(metaData->owner->metaData == metaData){
2397                                 //I am the owner
2398                                 if(metaData->owner->metaData->count == 0){
2399                                         //all the chunks have been freed
2400                                         ibv_dereg_mr(metaData->key);
2401                                         free(freePtr);
2402                                         free(metaData);
2403                                 }
2404                                 //if I am the owner and all the chunks have not been
2405                                 // freed dont free my metaData. will need later
2406                         }else{
2407                                 if(metaData->owner->metaData->count == 0){
2408                                         //need to free the owner's buffer and metadata
2409                                         freePtr = metaData->owner;
2410                                         ibv_dereg_mr(metaData->key);
2411                                         free(metaData->owner->metaData);
2412                                         free(freePtr);
2413                                 }
2414                                 free(metaData);
2415                         }
2416                 }
2417 }
2418
2419
2420 void infi_CmiFree(void *ptr){
2421         int i,j;
2422         int size;
2423         int parentPe;
2424         void *pointer;
2425         void *freePtr = ptr;
2426
2427         MACHSTATE(3,"Freeing");
2428
2429         ptr += sizeof(CmiChunkHeader);
2430         size = SIZEFIELD (ptr);
2431 /*      if(size > firstBinSize){*/
2432         infiCmiChunkMetaData *metaData;
2433         int poolIdx;
2434         //there is a infiniband specific header
2435         freePtr = ptr - sizeof(infiCmiChunkHeader);
2436         metaData = METADATAFIELD(ptr);
2437         poolIdx = metaData->poolIdx;
2438
2439         if(poolIdx == INFIMULTIPOOL){
2440                 /** this is a part of a received mult message  
2441                     it will be freed correctly later
2442                 **/
2443
2444                 return;
2445         }
2446
2447         // checking if this free operation is my responsibility
2448         parentPe = metaData->parentPe;
2449         if(parentPe != CmiMyRank()){
2450                 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2451                 return;
2452         }
2453
2454         infi_CmiFreeDirect(ptr);
2455
2456         // checking free request queues
2457         for(i = 0; i < INFI_NODE_SIZE; i++){
2458                 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2459                         for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2460                                 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2461                                 infi_CmiFreeDirect(pointer);    
2462                         }
2463                 }
2464         }
2465 }
2466
2467 #else
2468 void infi_CmiFree(void *ptr){
2469         int size;
2470         void *freePtr = ptr;
2471         
2472 #if CMK_SMP     
2473         CmiMemLock();
2474 #endif
2475         ptr += sizeof(CmiChunkHeader);
2476         size = SIZEFIELD (ptr);
2477 /*      if(size > firstBinSize){*/
2478                 infiCmiChunkMetaData *metaData;
2479                 int poolIdx;
2480                 //there is a infiniband specific header
2481                 freePtr = ptr - sizeof(infiCmiChunkHeader);
2482                 metaData = METADATAFIELD(ptr);
2483                 poolIdx = metaData->poolIdx;
2484                 if(poolIdx == INFIMULTIPOOL){
2485                         /** this is a part of a received mult message  
2486                         it will be freed correctly later
2487                         **/
2488                         
2489                         return;
2490                 }
2491                 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2492 //              CmiAssert(poolIdx >= 0);
2493                 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
2494                         metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2495                         infiCmiChunkPools[poolIdx].startBuf = freePtr;
2496                         infiCmiChunkPools[poolIdx].count++;
2497                         
2498                         MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2499                 }else{
2500                         MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2501                         metaData->owner->metaData->count--;
2502                         if(metaData->owner->metaData == metaData){
2503                                 //I am the owner
2504                                 if(metaData->owner->metaData->count == 0){
2505                                         //all the chunks have been freed
2506                                         ibv_dereg_mr(metaData->key);
2507                                         free(freePtr);
2508                                         free(metaData);
2509                                 }
2510                                 //if I am the owner and all the chunks have not been
2511                                 // freed dont free my metaData. will need later
2512                         }else{
2513                                 if(metaData->owner->metaData->count == 0){
2514                                         //need to free the owner's buffer and metadata
2515                                         freePtr = metaData->owner;
2516                                         ibv_dereg_mr(metaData->key);
2517                                         free(metaData->owner->metaData);
2518                                         free(freePtr);
2519                                 }
2520                                 free(metaData);
2521                         }
2522                 }       
2523 #if CMK_SMP     
2524         CmiMemUnlock();
2525 #endif
2526 /*      }else{
2527                 free(freePtr);
2528         }*/
2529 }
2530 #endif
2531
2532 /*********************************************************************************************
2533 This section is for CmiDirect. This is a variant of the  persistent communication in which 
2534 the user can transfer data between processors without using Charm++ messages. This lets the user
2535 send and receive data from the middle of his arrays without any copying on either send or receive
2536 side
2537 *********************************************************************************************/
2538
2539 struct infiDirectRequestPacket{
2540         int senderProc;
2541         int handle;
2542         struct ibv_mr senderKey;
2543         void *senderBuf;
2544         int senderBufSize;
2545 };
2546
2547 #include "cmidirect.h"
2548
2549 #define MAXHANDLES 512
2550
2551 struct infiDirectHandleStruct;
2552
2553
2554 typedef struct directPollingQNodeStruct {
2555         struct infiDirectHandleStruct *handle;
2556         struct directPollingQNodeStruct *next;
2557         double *lastDouble;
2558 } directPollingQNode;
2559
2560 typedef struct infiDirectHandleStruct{
2561         int id;
2562         void *buf;
2563         int size;
2564         struct ibv_mr *key;
2565         void (*callbackFnPtr)(void *);
2566         void *callbackData;
2567 //      struct infiDirectRequestPacket *packet;
2568         struct infiDirectUserHandle userHandle;
2569         struct infiRdmaPacket *rdmaPacket;
2570         directPollingQNode pollingQNode;
2571 }       infiDirectHandle;
2572
2573 typedef struct infiDirectHandleTableStruct{
2574         infiDirectHandle handles[MAXHANDLES];
2575         struct infiDirectHandleTableStruct *next;
2576 } infiDirectHandleTable;
2577
2578
2579 // data structures 
2580
2581 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2582
2583 static infiDirectHandleTable **sendHandleTable=NULL;
2584 static infiDirectHandleTable **recvHandleTable=NULL;
2585
2586 static int *recvHandleCount=NULL;
2587
2588 void addHandleToPollingQ(infiDirectHandle *handle){
2589 //      directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2590         directPollingQNode *newNode = &(handle->pollingQNode);
2591         newNode->handle = handle;
2592         newNode->next = NULL;
2593         if(headDirectPollingQ==NULL){
2594                 /*empty pollingQ*/
2595                 headDirectPollingQ = newNode;
2596                 tailDirectPollingQ = newNode;
2597         }else{
2598                 tailDirectPollingQ->next = newNode;
2599                 tailDirectPollingQ = newNode;
2600         }
2601 };
2602 /*
2603 infiDirectHandle *removeHandleFromPollingQ(){
2604         if(headDirectPollingQ == NULL){
2605                 //polling Q is empty
2606                 return NULL;
2607         }
2608         directPollingQNode *retNode = headDirectPollingQ;
2609         if(headDirectPollingQ == tailDirectPollingQ){
2610                 //PollingQ has one node 
2611                 headDirectPollingQ = tailDirectPollingQ = NULL;
2612         }else{
2613                 headDirectPollingQ = headDirectPollingQ->next;
2614         }
2615         infiDirectHandle *retHandle = retNode->handle;
2616         free(retNode);
2617         return retHandle;
2618 }*/
2619
2620 static inline infiDirectHandleTable **createHandleTable(){
2621         infiDirectHandleTable **table = malloc(_Cmi_numnodes*sizeof(infiDirectHandleTable *));
2622         int i;
2623         for(i=0;i<_Cmi_numnodes;i++){
2624                 table[i] = NULL;
2625         }
2626         return table;
2627 }
2628
2629 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2630         *tableIdx = handle/MAXHANDLES;
2631         *idx = handle%MAXHANDLES;
2632 };
2633
2634 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2635 {
2636         /** initialize the last double in the buffer to bufize***/
2637         int index = recvBufSize - sizeof(double);
2638         double *lastDouble = (double *)(((char *)recvBuf)+index);
2639         *lastDouble = initialValue;
2640 }
2641
2642
2643 /**
2644  To be called on the receiver to create a handle and return its number
2645 **/
2646 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2647         int newHandle;
2648         int tableIdx,idx;
2649         int i;
2650         infiDirectHandleTable *table;
2651         struct infiDirectUserHandle userHandle;
2652         
2653         CmiAssert(recvBufSize > sizeof(double));
2654         
2655         if(recvHandleTable == NULL){
2656                 recvHandleTable = createHandleTable();
2657                 recvHandleCount = malloc(sizeof(int)*_Cmi_numnodes);
2658                 for(i=0;i<_Cmi_numnodes;i++){
2659                         recvHandleCount[i] = -1;
2660                 }
2661         }
2662         if(recvHandleTable[senderNode] == NULL){
2663                 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2664                 recvHandleTable[senderNode]->next = NULL;               
2665         }
2666         
2667         newHandle = ++recvHandleCount[senderNode];
2668         CmiAssert(newHandle >= 0);
2669         
2670         calcHandleTableIdx(newHandle,&tableIdx,&idx);
2671         
2672         table = recvHandleTable[senderNode];
2673         for(i=0;i<tableIdx;i++){
2674                 if(table->next ==NULL){
2675                         table->next = malloc(sizeof(infiDirectHandleTable));
2676                         table->next->next = NULL;
2677                 }
2678                 table = table->next;
2679         }
2680         table->handles[idx].id = newHandle;
2681         table->handles[idx].buf = recvBuf;
2682         table->handles[idx].size = recvBufSize;
2683 #if CMI_DIRECT_DEBUG
2684         CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNode(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2685 #endif
2686         table->handles[idx].callbackFnPtr = callbackFnPtr;
2687         table->handles[idx].callbackData = callbackData;
2688         table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2689         CmiAssert(table->handles[idx].key != NULL);
2690
2691 /*      table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2692         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2693         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2694         
2695         userHandle.handle = newHandle;
2696         userHandle.recverNode = _Cmi_mynode;
2697         userHandle.senderNode = senderNode;
2698         userHandle.recverBuf = recvBuf;
2699         userHandle.recverBufSize = recvBufSize;
2700         memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2701         userHandle.initialValue = initialValue;
2702         
2703         table->handles[idx].userHandle = userHandle;
2704         
2705         initializeLastDouble(recvBuf,recvBufSize,initialValue);
2706
2707   {
2708          int index = table->handles[idx].size - sizeof(double);
2709    table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2710         } 
2711         
2712         addHandleToPollingQ(&(table->handles[idx]));
2713         
2714 //      MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2715         
2716         return userHandle;
2717 }
2718
2719 /****
2720  To be called on the sender to attach the sender's buffer to this handle
2721 ******/
2722 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2723         int tableIdx,idx;
2724         int i;
2725         int handle = userHandle->handle;
2726         int recverNode  = userHandle->recverNode;
2727
2728         infiDirectHandleTable *table;
2729
2730         if(sendHandleTable == NULL){
2731                 sendHandleTable = createHandleTable();
2732         }
2733         if(sendHandleTable[recverNode] == NULL){
2734                 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2735                 sendHandleTable[recverNode]->next = NULL;
2736         }
2737         
2738         CmiAssert(handle >= 0);
2739         calcHandleTableIdx(handle,&tableIdx,&idx);
2740         
2741         table = sendHandleTable[recverNode];
2742         for(i=0;i<tableIdx;i++){
2743                 if(table->next ==NULL){
2744                         table->next = malloc(sizeof(infiDirectHandleTable));
2745                         table->next->next = NULL;
2746                 }
2747                 table = table->next;
2748         }
2749
2750         table->handles[idx].id = handle;
2751         table->handles[idx].buf = sendBuf;
2752
2753         table->handles[idx].size = sendBufSize;
2754 #if CMI_DIRECT_DEBUG
2755         CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2756 #endif
2757         table->handles[idx].callbackFnPtr = table->handles[idx].callbackData = NULL;
2758         table->handles[idx].key =  ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2759         CmiAssert(table->handles[idx].key != NULL);
2760         table->handles[idx].userHandle = *userHandle;
2761         CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2762         
2763         table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2764         table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2765         table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2766         
2767         
2768 /*      table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2769         table->handles[idx].packet->senderProc = _Cmi_mynode;
2770         table->handles[idx].packet->handle = handle;
2771         table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2772         table->handles[idx].packet->senderBuf = sendBuf;
2773         table->handles[idx].packet->senderBufSize = sendBufSize;*/
2774         
2775         MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2776 };
2777
2778
2779
2780
2781
2782 /****
2783 To be called on the sender to do the actual data transfer
2784 ******/
2785 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2786         int handle = userHandle->handle;
2787         int recverNode = userHandle->recverNode;
2788         if(recverNode == _Cmi_mynode){
2789                 /*when the sender and receiver are on the same
2790                 processor, just look up the sender and receiver
2791                 buffers and do a memcpy*/
2792
2793                 infiDirectHandleTable *senderTable;
2794                 infiDirectHandleTable *recverTable;
2795                 
2796                 int tableIdx,idx,i;
2797
2798                 
2799                 /*find entry for this handle in sender table*/
2800                 calcHandleTableIdx(handle,&tableIdx,&idx);
2801                 CmiAssert(sendHandleTable!= NULL);
2802                 senderTable = sendHandleTable[_Cmi_mynode];
2803                 CmiAssert(senderTable != NULL);
2804                 for(i=0;i<tableIdx;i++){
2805                         senderTable = senderTable->next;
2806                 }
2807
2808                 /**find entry for this handle in recver table*/
2809                 recverTable = recvHandleTable[recverNode];
2810                 CmiAssert(recverTable != NULL);
2811                 for(i=0;i< tableIdx;i++){
2812                         recverTable = recverTable->next;
2813                 }
2814                 
2815                 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
2816                 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
2817 #if CMI_DIRECT_DEBUG
2818                 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
2819 #endif
2820                 // The polling Q should find you and handle the callback and pollingq entry
2821                 //              (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
2822                 
2823
2824         }else{
2825                 infiPacket packet;
2826                 int tableIdx,idx;
2827                 int i;
2828                 OtherNode node;
2829                 infiDirectHandleTable *table;
2830
2831                 calcHandleTableIdx(handle,&tableIdx,&idx);
2832
2833                 table = sendHandleTable[recverNode];
2834                 CmiAssert(table != NULL);
2835                 for(i=0;i<tableIdx;i++){
2836                         table = table->next;
2837                 }
2838
2839 //              MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
2840 #if CMI_DIRECT_DEBUG
2841                 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
2842 #endif
2843
2844                 
2845                 {
2846                         
2847                         OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
2848                         struct ibv_sge list = {
2849                                 .addr = (uintptr_t )table->handles[idx].buf,
2850                                 .length = table->handles[idx].size,
2851                                 .lkey   = table->handles[idx].key->lkey
2852                         };
2853                         
2854                         struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
2855
2856                         struct ibv_send_wr *bad_wr;
2857                         struct ibv_send_wr wr = {
2858                                 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
2859                                 .sg_list = &list,
2860                                 .num_sge = 1,
2861                                 .opcode = IBV_WR_RDMA_WRITE,
2862                                 .send_flags = IBV_SEND_SIGNALED,
2863                                 
2864                                 .wr.rdma = {
2865                                         .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
2866                                         .rkey = remoteKey->rkey
2867                                 }
2868                         };
2869                         /** post and rdma_read that is a rdma get*/
2870                         if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
2871                                 CmiAssert(0);
2872                         }
2873                 }
2874
2875         /*      MallocInfiPacket (packet);
2876                 {
2877                         packet->size = sizeof(struct infiDirectRequestPacket);
2878                         packet->buf = (char *)(table->handles[idx].packet);
2879                         struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
2880                         EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
2881                 }*/
2882         }
2883
2884 };
2885
2886 /**** need not be called the first time *********/
2887 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
2888   initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
2889 }
2890
2891 /**** need not be called the first time *********/
2892 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
2893         int handle = userHandle->handle;
2894         int tableIdx,idx,i;
2895         infiDirectHandleTable *table;
2896         calcHandleTableIdx(handle,&tableIdx,&idx);
2897         
2898         table = recvHandleTable[userHandle->senderNode];
2899         CmiAssert(table != NULL);
2900         for(i=0;i<tableIdx;i++){
2901                 table = table->next;
2902         }
2903 #if CMI_DIRECT_DEBUG
2904   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
2905 #endif  
2906         addHandleToPollingQ(&(table->handles[idx]));
2907         
2908
2909 }
2910
2911 /**** need not be called the first time *********/
2912 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
2913         int handle = userHandle->handle;
2914         int tableIdx,idx,i;
2915         infiDirectHandleTable *table;
2916         
2917         initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
2918
2919         calcHandleTableIdx(handle,&tableIdx,&idx);
2920         
2921         table = recvHandleTable[userHandle->senderNode];
2922         CmiAssert(table != NULL);
2923         for(i=0;i<tableIdx;i++){
2924                 table = table->next;
2925         }
2926 #if CMI_DIRECT_DEBUG
2927   CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNode(),userHandle->recverBuf);
2928 #endif  
2929         addHandleToPollingQ(&(table->handles[idx]));
2930         
2931 }
2932
2933
2934 static int receivedDirectMessage(infiDirectHandle *handle){
2935 //      int index = handle->size - sizeof(double);
2936 //      double *lastDouble = (double *)(((char *)handle->buf)+index);
2937         if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
2938                 return 0;
2939         }else{
2940                 (*(handle->callbackFnPtr))(handle->callbackData);       
2941                 return 1;
2942         }
2943         
2944 }
2945
2946
2947 static void pollCmiDirectQ(){
2948         directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
2949         while(ptr != NULL){
2950                 if(receivedDirectMessage(ptr->handle)){
2951 #if CMI_DIRECT_DEBUG
2952       CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNode(),ptr->handle->userHandle.recverBuf);
2953 #endif
2954                         directPollingQNode *delPtr = ptr;
2955                         /** has been received and delete this node***/
2956                         if(prevPtr == NULL){
2957                                 /** first in the pollingQ**/
2958                                 if(headDirectPollingQ == tailDirectPollingQ){
2959                                         /**only node in pollingQ****/
2960                                         headDirectPollingQ = tailDirectPollingQ = NULL;
2961                                 }else{
2962                                         headDirectPollingQ = headDirectPollingQ->next;
2963                                 }
2964                         }else{
2965                                 if(ptr == tailDirectPollingQ){
2966                                         /**last node is being deleted**/
2967                                         tailDirectPollingQ = prevPtr;
2968                                 }
2969                                 prevPtr->next = ptr->next;
2970                         }
2971                         ptr = ptr->next;
2972                 //      free(delPtr);
2973                 }else{
2974                         prevPtr = ptr;
2975                         ptr = ptr->next;
2976                 }
2977         }
2978 }
2979
2980
2981 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
2982         int senderProc = directRequestPacket->senderProc;
2983         int handle = directRequestPacket->handle;
2984         int tableIdx,idx,i;
2985         infiDirectHandleTable *table;
2986         OtherNode node = nodes_by_pe[senderProc];
2987
2988         MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
2989
2990         calcHandleTableIdx(handle,&tableIdx,&idx);
2991
2992         table = recvHandleTable[senderProc];
2993         CmiAssert(table != NULL);
2994         for(i=0;i<tableIdx;i++){
2995                 table = table->next;
2996         }
2997         
2998         CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
2999         
3000         {
3001                 struct ibv_sge list = {
3002                         .addr = (uintptr_t )table->handles[idx].buf,
3003                         .length = table->handles[idx].size,
3004                         .lkey   = table->handles[idx].key->lkey
3005                 };
3006
3007                 struct ibv_send_wr *bad_wr;
3008                 struct ibv_send_wr wr = {
3009                         .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3010                         .sg_list = &list,
3011                         .num_sge = 1,
3012                         .opcode = IBV_WR_RDMA_READ,
3013                         .send_flags = IBV_SEND_SIGNALED,
3014                         .wr.rdma = {
3015                                 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3016                                 .rkey = directRequestPacket->senderKey.rkey
3017                         }
3018                 };
3019 //       post and rdma_read that is a rdma get
3020                 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3021                         CmiAssert(0);
3022                 }
3023         }
3024                         
3025         
3026 };*/
3027 /*
3028 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3029         MACHSTATE(3,"processDirectWC");
3030         infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3031         (*(handle->callbackFnPtr))(handle->callbackData);
3032 };
3033 */
3034
3035 static void sendBarrierMessage(int pe)
3036 {
3037   /* we will only need one packet */
3038   int size=32;
3039   OtherNode  node = nodes + pe;
3040   infiPacket packet;
3041   MallocInfiPacket(packet);
3042   packet->size = size;
3043   packet->buf = CmiAlloc(size);
3044   packet->header.code = INFIBARRIERPACKET;
3045   struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3046   MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3047   /*  pollSendCq(0);*/
3048   EnqueuePacket(node,packet,size,key);
3049 }
3050
3051 static void recvBarrierMessage()
3052 {
3053   int i;
3054   int ne;
3055   /*  struct ibv_wc wc[WC_LIST_SIZE];*/
3056   struct ibv_wc wc[1];
3057   struct ibv_wc *recvWC;
3058   /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3059   int toBuffer=1; // buffer without processing recvd messages
3060   int barrierReached=0;
3061   struct infiBuffer *buffer = NULL;
3062   struct infiPacketHeader *header = NULL;
3063   int nodeNo=-1;
3064   int len=-1;
3065   while(!barrierReached)
3066     {
3067       /* gengbin's semantic will implode if more than one q is polled at a time */
3068       ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3069       //        CmiAssert(ne >=0);
3070       if(ne != 0){
3071         MACHSTATE1(3,"recvBarrier ne %d",ne);
3072       }
3073       pollSendCq(1); 
3074       for(i=0;i<ne;i++){
3075         if(wc[i].status != IBV_WC_SUCCESS){
3076           CmiAssert(0);
3077         }
3078         switch(wc[i].opcode){
3079         case IBV_WC_RECV: /* we have something to consider*/
3080           recvWC=&wc[i];
3081           buffer = (struct infiBuffer *) recvWC->wr_id; 
3082           header = (struct infiPacketHeader *)buffer->buf;
3083           nodeNo = header->nodeNo;
3084           len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3085
3086           if(header->code & INFIPACKETCODE_DATA){
3087             processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3088           }
3089           if(header->code & INFIDUMMYPACKET){
3090             MACHSTATE(3,"Dummy packet");
3091           }
3092           if(header->code & INFIBARRIERPACKET){
3093             MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);      
3094             // now we are done
3095             barrierReached=1;
3096             /* semantically questionable */
3097             //processAllBufferedMsgs();
3098             //return;
3099           }
3100           if(rdma && header->code & INFIRDMA_START){
3101             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3102             //          if(toBuffer){
3103             //TODO: make a function of this and use for both acks and requests
3104             struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3105             struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3106             *copyPacket = *rdmaPacket;
3107             copyPacket->fromNodeNo = nodeNo;
3108             MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3109             context->bufferedRdmaRequests = copyPacket;
3110             copyPacket->next = tmp;
3111             copyPacket->prev = NULL;
3112             if(tmp != NULL){
3113               tmp->prev = copyPacket;
3114             }
3115             /*          }else{
3116                         processRdmaRequest(rdmaPacket,nodeNo,0);
3117                         }*/
3118           }
3119           if(rdma && header->code & INFIRDMA_ACK){
3120             struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3121             processRdmaAck(rdmaPacket);
3122           }
3123           {
3124             struct ibv_sge list = {
3125               .addr     = (uintptr_t) buffer->buf,
3126               .length = buffer->size,
3127               .lkey     = buffer->key->lkey
3128             };
3129         
3130             struct ibv_recv_wr wr = {
3131               .wr_id = (uint64_t)buffer,
3132               .sg_list = &list,
3133               .num_sge = 1,
3134               .next = NULL
3135             };
3136             struct ibv_recv_wr *bad_wr;
3137         
3138             if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3139               CmiAssert(0);
3140             }
3141           }
3142
3143           break;
3144         default:
3145           CmiAbort("Wrong type of work completion object in recvq");
3146           break;
3147         }
3148       }
3149     }
3150   /* semantically questionable */
3151   //  processAllBufferedMsgs();
3152 }
3153
3154
3155 /* happen at node level */
3156 int CmiBarrier()
3157 {
3158   int len, size, i;
3159   int status;
3160   int count = 0;
3161   OtherNode node;
3162   int numnodes = CmiNumNodes();
3163   if (CmiMyRank() == 0) {
3164     /* every one send to pe 0 */
3165     if (CmiMyNode() != 0) {
3166       sendBarrierMessage(0);
3167     }
3168     /* printf("[%d] HERE\n", CmiMyPe()); */
3169     if (CmiMyNode() == 0) 
3170     {
3171       for (count = 1; count < numnodes; count ++) 
3172       {
3173         recvBarrierMessage();
3174       }
3175       /* pe 0 broadcast */
3176       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3177         int p = i;
3178         if (p > numnodes - 1) break;
3179         /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3180         sendBarrierMessage(p);
3181       }
3182     }
3183     /* non 0 node waiting */
3184     if (CmiMyNode() != 0) 
3185     {
3186       recvBarrierMessage();
3187       for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3188         int p = CmiMyNode();
3189         p = BROADCAST_SPANNING_FACTOR*p + i;
3190         if (p > numnodes - 1) break;
3191         p = p%numnodes;
3192         /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3193         sendBarrierMessage(p);
3194       }
3195     }
3196   }
3197   CmiNodeAllBarrier();
3198   processAllBufferedMsgs();
3199   /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3200 }
3201
3202 /* everyone sends a message to pe 0 and go on */
3203 int CmiBarrierZero()
3204 {
3205   int i;
3206
3207   if (CmiMyRank() == 0) {
3208     if (CmiMyNode()) {
3209       sendBarrierMessage(0);
3210     }
3211     else {
3212       for (i=0; i<CmiNumNodes()-1; i++)
3213       {
3214         recvBarrierMessage();
3215       }
3216     }
3217   }
3218   CmiNodeAllBarrier();
3219   processAllBufferedMsgs();
3220 }
3221