minor
[charm.git] / src / conv-ccs / middle-ccs.C
1 #include <unistd.h>
2 #include "middle.h"
3
4 #if CMK_BIGSIM_CHARM
5 #include "bgconverse.h"
6 #endif
7 #include "ccs-server.h"
8 #include "conv-ccs.h"
9
10 #if CMK_CCS_AVAILABLE
11 extern "C" void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
12
13 extern "C" void req_fw_handler(char *msg)
14 {
15   int offset = CmiReservedHeaderSize + sizeof(CcsImplHeader);
16   CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiReservedHeaderSize);
17   int destPE = (int)ChMessageInt(hdr->pe);
18   if (CmiMyPe() == 0 && destPE == -1) {
19     /* Broadcast message to all other processors */
20     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
21     CmiSyncBroadcast(len, msg);
22   }
23   else if (destPE < -1) {
24     /* Multicast the message to your children */
25     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(ChMessageInt_t);
26     int index, child, i;
27     int *pes = (int*)(msg+CmiReservedHeaderSize+sizeof(CcsImplHeader));
28     ChMessageInt_t *pes_nbo = (ChMessageInt_t *)pes;
29     offset -= destPE * sizeof(ChMessageInt_t);
30     if (ChMessageInt(pes_nbo[0]) == CmiMyPe()) {
31       for (index=0; index<-destPE; ++index) pes[index] = ChMessageInt(pes_nbo[index]);
32     }
33     for (index=0; index<-destPE; ++index) {
34       if (pes[index] == CmiMyPe()) break;
35     }
36     child = (index << 2) + 1;
37     for (i=0; i<4; ++i) {
38       if (child+i < -destPE) {
39         CmiSyncSend(pes[child+i], len, msg);
40       }
41     }
42   }
43   CcsHandleRequest(hdr, msg+offset);
44   CmiFree(msg);
45 }
46
47 #ifdef _MSC_VER
48 extern "C" size_t write(int fd, const void *buf, size_t count);
49 #endif
50
51 extern "C" int rep_fw_handler_idx;
52 /**
53  * Decide if the reply is ready to be forwarded to the waiting client,
54  * or if combination is required (for broadcast/multicast CCS requests.
55  */
56 extern "C" int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
57   int repPE = (int)ChMessageInt(rep->pe);
58   if (repPE <= -1) {
59     /* Reduce the message to get the final reply */
60     CcsHandlerRec *fn;
61     int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
62     char *msg=(char*)CmiAlloc(len);
63     char *r=msg+CmiReservedHeaderSize;
64     char *handlerStr;
65     rep->len = ChMessageInt_new(repLen);
66     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
67     memcpy(r,repData,repLen);
68     CmiSetHandler(msg,rep_fw_handler_idx);
69     handlerStr=rep->handler;
70     fn=(CcsHandlerRec *)CcsGetHandler(handlerStr);
71     if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
72     if (repPE == -1) {
73       /* CCS Broadcast */
74       CkReduce(msg, len, fn->mergeFn);
75     } else {
76       /* CCS Multicast */
77       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
78     }
79   } else {
80     if (_conditionalDelivery == 0) CcsImpl_reply(rep, repLen, repData);
81     else {
82       /* We are the child of a conditional delivery, write to the parent the reply */
83       write(conditionalPipe[1], &repLen, 4);
84       write(conditionalPipe[1], repData, repLen);
85     }
86   }
87   return 0;
88 }
89
90
91 /**********************************************
92   "ccs_getinfo"-- takes no data
93     Return the number of parallel nodes, and
94       the number of processors per node as an array
95       of 4-byte big-endian ints.
96 */
97
98 void ccs_getinfo(char *msg)
99 {
100   int nNode=CmiNumNodes();
101   int len=(1+nNode)*sizeof(ChMessageInt_t);
102   ChMessageInt_t *table=(ChMessageInt_t *)malloc(len);
103   int n;
104   table[0]=ChMessageInt_new(nNode);
105   for (n=0;n<nNode;n++)
106     table[1+n]=ChMessageInt_new(CmiNodeSize(n));
107   CcsSendReply(len,(const char *)table);
108   free(table);
109   CmiFree(msg);
110 }
111
112 ///////////////////////////////// middle-debug.C
113 #endif
114 #if ! CMK_HAS_GETPID
115 typedef int pid_t;
116 #endif
117
118 extern "C" {
119
120 CpvDeclare(void *, debugQueue);
121 CpvDeclare(int, freezeModeFlag);
122
123 /*
124  Start the freeze-- call will not return until unfrozen
125  via a CCS request.
126  */
127 void CpdFreeze(void)
128 {
129   pid_t pid = 0;
130 #if CMK_HAS_GETPID
131   pid = getpid();
132 #endif
133   CpdNotify(CPD_FREEZE,pid);
134   if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
135   CpvAccess(freezeModeFlag) = 1;
136 #if ! CMK_BIGSIM_CHARM
137   CpdFreezeModeScheduler();
138 #endif
139 }
140
141 void CpdUnFreeze(void)
142 {
143   CpvAccess(freezeModeFlag) = 0;
144 }
145
146 int CpdIsFrozen(void) {
147   return CpvAccess(freezeModeFlag);
148 }
149
150 }
151
152 #if CMK_BIGSIM_CHARM
153 #include "blue_impl.h"
154 void BgProcessMessageFreezeMode(threadInfo *t, char *msg) {
155 //  CmiPrintf("BgProcessMessageFreezeMode\n");
156 #if CMK_CCS_AVAILABLE
157   void *debugQ=CpvAccess(debugQueue);
158   CmiAssert(msg!=NULL);
159   int processImmediately = CpdIsDebugMessage(msg);
160   if (processImmediately) BgProcessMessageDefault(t, msg);
161   while (!CpvAccess(freezeModeFlag) && !CdsFifo_Empty(debugQ)) {
162     BgProcessMessageDefault(t, (char*)CdsFifo_Dequeue(debugQ));
163   }
164   if (!processImmediately) {
165     if (!CpvAccess(freezeModeFlag)) BgProcessMessageDefault(t, msg); 
166     else CdsFifo_Enqueue(debugQ, msg);
167   }
168 #else
169   BgProcessMessageDefault(t, msg);
170 #endif
171 }
172 #endif
173
174 void PrintDebugStackTrace(void *);
175 extern "C" void * MemoryToSlot(void *ptr);
176 extern "C" int Slot_StackTrace(void *s, void ***stack);
177 extern "C" int Slot_ChareOwner(void *s);
178
179 #include <stdarg.h>
180 void CpdNotify(int type, ...) {
181   void *ptr; int integer, i;
182   pid_t pid=0;
183   int levels=64;
184   void *stackPtrs[64];
185   void *sl;
186   va_list list;
187   va_start(list, type);
188   switch (type) {
189   case CPD_ABORT:
190     CmiPrintf("CPD: %d Abort %s\n",CmiMyPe(), va_arg(list, char*));
191     break;
192   case CPD_SIGNAL:
193     CmiPrintf("CPD: %d Signal %d\n",CmiMyPe(), va_arg(list, int));
194     break;
195   case CPD_FREEZE:
196 #if CMK_HAS_GETPID
197     pid = getpid();
198 #endif
199     CmiPrintf("CPD: %d Freeze %d\n",CmiMyPe(),pid);
200     break;
201   case CPD_BREAKPOINT:
202     CmiPrintf("CPD: %d BP %s\n",CmiMyPe(), va_arg(list, char*));
203     break;
204   case CPD_CROSSCORRUPTION:
205     ptr = va_arg(list, void*);
206     integer = va_arg(list, int);
207     CmiPrintf("CPD: %d Cross %p %d ",CmiMyPe(), ptr, integer);
208     sl = MemoryToSlot(ptr);
209     if (sl != NULL) {
210       int stackLen; void **stackTrace;
211       stackLen = Slot_StackTrace(sl, &stackTrace);
212       CmiPrintf("%d %d ",Slot_ChareOwner(sl),stackLen);
213       for (i=0; i<stackLen; ++i) CmiPrintf("%p ",stackTrace[i]);
214     } else {
215       CmiPrintf("0 ");
216     }
217     CmiBacktraceRecord(stackPtrs,1,&levels);
218     CmiPrintf("%d ",levels);
219     for (i=0; i<levels; ++i) CmiPrintf("%p ",stackPtrs[i]);
220     CmiPrintf("\n");
221     break;
222   }
223   va_end(list);
224 }