Fixed CpdIsBdCharmDebugMessage with Gengbin
[charm.git] / src / conv-ccs / middle-ccs.C
1 #include "middle.h"
2
3 #if CMK_BLUEGENE_CHARM
4 #include "bgconverse.h"
5 #endif
6 #include "ccs-server.h"
7 #include "conv-ccs.h"
8
9 #if CMK_CCS_AVAILABLE
10 extern "C" void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
11
12 extern "C" void req_fw_handler(char *msg)
13 {
14   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
15   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
16   int destPE = (int)ChMessageInt(hdr->pe);
17   if (CmiMyPe() == 0 && destPE == -1) {
18     /* Broadcast message to all other processors */
19     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
20     CmiSyncBroadcast(len, msg);
21   }
22   else if (destPE < -1) {
23     /* Multicast the message to your children */
24     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
25     int index, child, i;
26     int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
27     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
28     offset -= destPE * sizeof(ChMessageInt_t);
29     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
30       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
31     }
32     for (index=0; index<-destPE; ++index) {
33       if (pes[index] == CmiMyPe()) break;
34     }
35     child = (index << 2) + 1;
36     for (i=0; i<4; ++i) {
37       if (child+i < -destPE) {
38         CmiSyncSend(pes[child+i], len, msg);
39       }
40     }
41   }
42   CcsHandleRequest(hdr, msg+offset);
43   CmiFree(msg);
44 }
45
46 extern "C" int rep_fw_handler_idx;
47 /**
48  * Decide if the reply is ready to be forwarded to the waiting client,
49  * or if combination is required (for broadcast/multicast CCS requests.
50  */
51 extern "C" int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
52   int repPE = (int)ChMessageInt(rep->pe);
53   if (repPE <= -1) {
54     /* Reduce the message to get the final reply */
55     CcsHandlerRec *fn;
56     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
57     char *msg=(char*)CmiAlloc(len);
58     char *r=msg+CmiReservedHeaderSize;
59     char *handlerStr;
60     rep->len = ChMessageInt_new(repLen);
61     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
62     memcpy(r,repData,repLen);
63     CmiSetHandler(msg,rep_fw_handler_idx);
64     handlerStr=rep->handler;
65     fn=(CcsHandlerRec *)CcsGetHandler(handlerStr);
66     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
67     if (repPE == -1) {
68       /* CCS Broadcast */
69       CmiPrintf("calling CkReduce\n");
70       CkReduce(msg, len, fn->mergeFn);
71     } else {
72       /* CCS Multicast */
73       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
74     }
75   } else {
76     CmiPrintf("Ccs replying\n");
77     CcsImpl_reply(rep, repLen, repData);
78   }
79   return 0;
80 }
81 #endif
82
83 /**********************************************
84   "ccs_getinfo"-- takes no data
85     Return the number of parallel nodes, and
86       the number of processors per node as an array
87       of 4-byte big-endian ints.
88 */
89
90 void ccs_getinfo(char *msg)
91 {
92   int nNode=CmiNumNodes();
93   int len=(1+nNode)*sizeof(ChMessageInt_t);
94   ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
95   int n;
96   table[0]=ChMessageInt_new(nNode);
97   for (n=0;n<nNode;n++)
98     table[1+n]=ChMessageInt_new(CmiNodeSize(n));
99   CcsSendReply(len,(const char *)table);
100   free(table);
101   CmiFree(msg);
102 }
103
104 ///////////////////////////////// middle-debug.C
105
106 #if ! CMK_HAS_GETPID
107 typedef int pid_t;
108 #endif
109
110 extern "C" {
111
112 CpvDeclare(void *, debugQueue);
113 CpvDeclare(int, freezeModeFlag);
114
115 /*
116  Start the freeze-- call will not return until unfrozen
117  via a CCS request.
118  */
119 void CpdFreeze(void)
120 {
121   pid_t pid = 0;
122 #if CMK_HAS_GETPID
123   pid = getpid();
124 #endif
125   CpdNotify(CPD_FREEZE,pid);
126   if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
127   CpvAccess(freezeModeFlag) = 1;
128 #if ! CMK_BLUEGENE_CHARM
129   CpdFreezeModeScheduler();
130 #endif
131 }
132
133 void CpdUnFreeze(void)
134 {
135   CpvAccess(freezeModeFlag) = 0;
136 }
137
138 int CpdIsFrozen(void) {
139   return CpvAccess(freezeModeFlag);
140 }
141
142 }
143
144 #if CMK_BLUEGENE_CHARM
145 #include "blue_impl.h"
146 void BgProcessMessageFreezeMode(threadInfo *t, char *msg) {
147 //  CmiPrintf("BgProcessMessageFreezeMode\n");
148 #if CMK_CCS_AVAILABLE
149   void *debugQ=CpvAccess(debugQueue);
150   CmiAssert(msg!=NULL);
151   int processImmediately = CpdIsDebugMessage(msg);
152   if (processImmediately) BgProcessMessageDefault(t, msg);
153   while (!CpvAccess(freezeModeFlag) && !CdsFifo_Empty(debugQ)) {
154     BgProcessMessageDefault(t, (char*)CdsFifo_Dequeue(debugQ));
155   }
156   if (!processImmediately) {
157     if (!CpvAccess(freezeModeFlag)) BgProcessMessageDefault(t, msg); 
158     else CdsFifo_Enqueue(debugQ, msg);
159   }
160 #else
161   BgProcessMessageDefault(t, msg);
162 #endif
163 }
164 #endif
165
166 void PrintDebugStackTrace(void *);
167 extern "C" void * MemoryToSlot(void *ptr);
168 extern "C" int Slot_StackTrace(void *s, void ***stack);
169 extern "C" int Slot_ChareOwner(void *s);
170
171 #include <stdarg.h>
172 void CpdNotify(int type, ...) {
173   void *ptr; int integer, i;
174   pid_t pid=0;
175   int levels=64;
176   void *stackPtrs[64];
177   void *sl;
178   va_list list;
179   va_start(list, type);
180   switch (type) {
181   case CPD_ABORT:
182     CmiPrintf("CPD: %d Abort %s\n",CmiMyPe(), va_arg(list, char*));
183     break;
184   case CPD_SIGNAL:
185     CmiPrintf("CPD: %d Signal %d\n",CmiMyPe(), va_arg(list, int));
186     break;
187   case CPD_FREEZE:
188 #if CMK_HAS_GETPID
189     pid = getpid();
190 #endif
191     CmiPrintf("CPD: %d Freeze %d\n",CmiMyPe(),pid);
192     break;
193   case CPD_BREAKPOINT:
194     CmiPrintf("CPD: %d BP %s\n",CmiMyPe(), va_arg(list, char*));
195     break;
196   case CPD_CROSSCORRUPTION:
197     ptr = va_arg(list, void*);
198     integer = va_arg(list, int);
199     CmiPrintf("CPD: %d Cross %p %d ",CmiMyPe(), ptr, integer);
200     sl = MemoryToSlot(ptr);
201     if (sl != NULL) {
202       int stackLen; void **stackTrace;
203       stackLen = Slot_StackTrace(sl, &stackTrace);
204       CmiPrintf("%d %d ",Slot_ChareOwner(sl),stackLen);
205       for (i=0; i<stackLen; ++i) CmiPrintf("%p ",stackTrace[i]);
206     } else {
207       CmiPrintf("0 ");
208     }
209     CmiBacktraceRecord(stackPtrs,1,&levels);
210     CmiPrintf("%d ",levels);
211     for (i=0; i<levels; ++i) CmiPrintf("%p ",stackPtrs[i]);
212     CmiPrintf("\n");
213     break;
214   }
215   va_end(list);
216 }