f13a248cb89ad4ab9927372bed246458295ea5dc
[charm.git] / src / conv-ccs / middle-ccs.C
1 #include "middle.h"
2
3 #if CMK_BLUEGENE_CHARM
4 #include "bgconverse.h"
5 #endif
6 #include "ccs-server.h"
7 #include "conv-ccs.h"
8
9 extern "C" void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
10
11 extern "C" void req_fw_handler(char *msg)
12 {
13   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
14   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
15   int destPE = (int)ChMessageInt(hdr->pe);
16   if (CmiMyPe() == 0 && destPE == -1) {
17     /* Broadcast message to all other processors */
18     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
19     CmiSyncBroadcast(len, msg);
20   }
21   else if (destPE < -1) {
22     /* Multicast the message to your children */
23     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
24     int index, child, i;
25     int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
26     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
27     offset -= destPE * sizeof(ChMessageInt_t);
28     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
29       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
30     }
31     for (index=0; index<-destPE; ++index) {
32       if (pes[index] == CmiMyPe()) break;
33     }
34     child = (index << 2) + 1;
35     for (i=0; i<4; ++i) {
36       if (child+i < -destPE) {
37         CmiSyncSend(pes[child+i], len, msg);
38       }
39     }
40   }
41   CcsHandleRequest(hdr, msg+offset);
42   CmiFree(msg);
43 }
44
45 extern "C" void CcsSendReply(int replyLen, const void *replyData);
46 extern int rep_fw_handler_idx;
47 /**
48  * Decide if the reply is ready to be forwarded to the waiting client,
49  * or if combination is required (for broadcast/multicast CCS requests.
50  */
51 extern "C" int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
52   int repPE = (int)ChMessageInt(rep->pe);
53   if (repPE <= -1) {
54     /* Reduce the message to get the final reply */
55     CcsHandlerRec *fn;
56     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
57     char *msg=(char*)CmiAlloc(len);
58     char *r=msg+CmiReservedHeaderSize;
59     char *handlerStr;
60     rep->len = ChMessageInt_new(repLen);
61     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
62     memcpy(r,repData,repLen);
63     CmiSetHandler(msg,rep_fw_handler_idx);
64     handlerStr=rep->handler;
65     fn=(CcsHandlerRec *)CcsGetHandler(handlerStr);
66     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
67     if (repPE == -1) {
68       /* CCS Broadcast */
69       CkReduce(msg, len, fn->mergeFn);
70     } else {
71       /* CCS Multicast */
72       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
73     }
74   } else {
75     CcsImpl_reply(rep, repLen, repData);
76   }
77 }
78
79 /**********************************************
80   "ccs_getinfo"-- takes no data
81     Return the number of parallel nodes, and
82       the number of processors per node as an array
83       of 4-byte big-endian ints.
84 */
85
86 void ccs_getinfo(char *msg)
87 {
88   int nNode=CmiNumNodes();
89   int len=(1+nNode)*sizeof(ChMessageInt_t);
90   ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
91   int n;
92   table[0]=ChMessageInt_new(nNode);
93   for (n=0;n<nNode;n++)
94     table[1+n]=ChMessageInt_new(CmiNodeSize(n));
95   CcsSendReply(len,(const char *)table);
96   free(table);
97   CmiFree(msg);
98 }
99
100 //////////////////////////////////////////////////////////////////// middle-debug.C
101
102 extern "C" {
103
104 CpvDeclare(void *, debugQueue);
105 CpvDeclare(int, freezeModeFlag);
106
107 /*
108  Start the freeze-- call will not return until unfrozen
109  via a CCS request.
110  */
111 void CpdFreeze(void)
112 {
113   CpdNotify(CPD_FREEZE,getpid());
114   if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
115   CpvAccess(freezeModeFlag) = 1;
116 #if ! CMK_BLUEGENE_CHARM
117   CpdFreezeModeScheduler();
118 #endif
119 }
120
121 void CpdUnFreeze(void)
122 {
123   CpvAccess(freezeModeFlag) = 0;
124 }
125
126 int CpdIsFrozen(void) {
127   return CpvAccess(freezeModeFlag);
128 }
129
130 }
131
132 #if CMK_BLUEGENE_CHARM
133 #include "blue_impl.h"
134 void BgProcessMessageFreezeMode(threadInfo *t, char *msg) {
135 //  CmiPrintf("BgProcessMessageFreezeMode\n");
136 #if CMK_CCS_AVAILABLE
137   void *debugQ=CpvAccess(debugQueue);
138   CmiAssert(msg!=NULL);
139   int processImmediately = CpdIsDebugMessage(msg);
140   if (processImmediately) BgProcessMessageDefault(t, msg);
141   while (!CpvAccess(freezeModeFlag) && !CdsFifo_Empty(debugQ)) {
142     BgProcessMessageDefault(t, (char*)CdsFifo_Dequeue(debugQ));
143   }
144   if (!processImmediately) {
145     if (!CpvAccess(freezeModeFlag)) BgProcessMessageDefault(t, msg); 
146     else CdsFifo_Enqueue(debugQ, msg);
147   }
148 #else
149   BgProcessMessageDefault(t, msg);
150 #endif
151 }
152 #endif