5cb411c5bce256de7e2400dbd73644dadbaf331b
[charm.git] / src / conv-ldb / cldb.prioritycentralized.c
1 #include "converse.h"
2 #include "cldb.prioritycentralized.h"
3 #include "queueing.h"
4 #include "cldb.h"
5
6 #include "priorityqueue.c"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 50                /* default: 30 */
12 #define MAXOVERLOAD 1
13
14 #define YH_DEBUG 0
15 #define THRESHOLD_LOAD 5
16 #define _U_INT_MAX 2000000000
17
18 #define LOAD_WEIGHT 0.1
19 #define PRIOR_WEIGHT 0.1
20
21 CpvDeclare(CldProcInfo, CldData);
22 extern char *_lbtopo;                   /* topology name string */
23 int _lbsteal = 0;                       /* work stealing flag */
24
25 CpvDeclare(MsgHeap, CldManagerLoadQueue);
26 CpvDeclare(CldSlavePriorInfo*, CldSlavesPriorityQueue); //maintened in master to check which processor has which priority
27
28 CpvDeclare(int, CldAskLoadHandlerIndex);
29 CpvDeclare(int,  CldstorecharemsgHandlerIndex);
30 CpvDeclare(int, CldHigherPriorityComesHandlerIndex);
31 CpvDeclare(int, CldReadytoExecHandlerIndex);
32 CpvDeclare(void*, CldRequestQueue);
33 CpvDeclare(CldPriorityToken, headsentinel);
34 CpvDeclare(priormsg, nexttoexec);
35
36 void LoadNotifyFn(int l)
37 {
38   CldProcInfo  cldData = CpvAccess(CldData);
39   cldData->sent = 0;
40 }
41
42 char *CldGetStrategy(void)
43 {
44   return "prioritycentralized";
45 }
46
47 inline void SendTasktoPe(int receiver, void *msg)
48 {
49     CldInfoFn ifn; 
50     CldPackFn pfn;
51     int len, queueing, priobits, avg;
52     unsigned int *prioptr;
53     int old_load;
54     int new_load;
55
56     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
57     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
58     CldRestoreHandler(msg);
59     CldSwitchHandler(msg, CpvAccess(CldHigherPriorityComesHandlerIndex));
60     CmiSyncSendAndFree(receiver, len, msg);
61
62     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
63     new_load = old_load + 1;
64     if(old_load == 0)
65     {
66         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = *prioptr;
67     }else
68     {
69         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority/(new_load)*old_load + *prioptr/(new_load);
70     }
71     CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
72
73 #if YH_DEBUG
74     CmiPrintf(" sending this msg with prior %u  to processor %d len=%d \n", *prioptr, receiver, len);
75 #endif
76 }
77 /* master processor , what to do when receive a new msg from network */
78 static void CldStoreCharemsg(void *msg)
79 {
80     /* insert the message into priority queue*/
81     CldInfoFn ifn; 
82     CldPackFn pfn;
83     int len, queueing, priobits, avg;
84     unsigned int *prioptr;
85     priormsg *p_msg ;
86     /* delay request msg */
87     requestmsg *request_msg;
88     int request_pe;
89     void* loadmsg;
90
91     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
92     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
93 #if YH_DEBUG
94     CmiPrintf(" Step 2: on processor 0, Get new created msg and store it , PRIOR=%u Timer=%f\n", *prioptr, CmiTimer());
95 #endif
96     /* check whether there is processor with lower priority, it exists, push this task to that processor */
97     /* find the processor with the highest priority */
98     int i=0; 
99     unsigned int _max_ = *prioptr;
100     int index = 0;
101     double max_evaluation = 0;
102     int old_load;
103     //check which processor has underload and also the task priority is lower than the msg priority
104     for(i=1; i<CmiNumPes();i++)
105     {   //underload to avoid overflow
106 #if YH_DEBUG
107         CmiPrintf(" processor %d has load num:%d\n", i, CpvAccess(CldSlavesPriorityQueue)[i].load);
108 #endif
109         old_load = CpvAccess(CldSlavesPriorityQueue)[i].load;
110         if(old_load == 0)
111         {
112             index = i;
113             break;
114         }
115         double evaluation = (CpvAccess(CldSlavesPriorityQueue)[i].average_priority)* PRIOR_WEIGHT * (THRESHOLD_LOAD - CpvAccess(CldSlavesPriorityQueue)[i].load);
116         if(evaluation > max_evaluation)
117         {
118             max_evaluation = evaluation;
119             index = i;
120         }
121     }
122     if(old_load == 0 || CpvAccess(CldSlavesPriorityQueue)[index].average_priority > *prioptr)
123     {
124         //send task to that processor
125         SendTasktoPe(index, msg);
126 #if YH_DEBUG
127         CmiPrintf(" Step 2-1: processor 0 send task to idle processor %d, msg prior=%u Timer=%f\n", index, *prioptr, CmiTimer());
128 #endif
129         return;
130     }
131
132     p_msg = (priormsg*)malloc(sizeof(priormsg));
133     p_msg->priority = *prioptr;
134     p_msg->msg = msg;
135     /*Lock here? */
136     if(heap_isFull(&CpvAccess(CldManagerLoadQueue)))
137     {
138         CmiPrintf("Queue is already full, message will be lost\n");
139     }
140     else
141         heap_addItem(&CpvAccess(CldManagerLoadQueue), p_msg);
142 #if YH_DEBUG
143         CmiPrintf(" Step 2-3:  processor 0 , all processors are busy , store this msg  msg prior=%u Queuesize=%d Timer=%f\n", *prioptr, heap_size(&CpvAccess(CldManagerLoadQueue)), CmiTimer());
144 #endif
145
146 }
147 /* immediate message handler, work at node level */
148 /* send some work to requested proc */
149 static void CldAskLoadHandler(requestmsg *msg)
150 {
151     /* pickup the msg with the highest priority */
152     /* response to the requester chare */
153     int receiver, rank, recvIdx, i;
154     void* loadmsg;
155     CldInfoFn ifn; 
156     CldPackFn pfn;
157     int len, queueing, priobits, avg; 
158     unsigned int *prioptr;
159     
160     double old_load;
161     double new_load;
162     double old_average_prior;
163     /* only give you work if I have more than 1 */
164     receiver = msg->from_pe;
165     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
166     old_average_prior = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority;
167 #if YH_DEBUG
168     CmiPrintf(" Step 6 :%f %d<======= getrequest  from processor queue current size=%d, notidle=%d, load=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)), msg->notidle, CpvAccess(CldSlavesPriorityQueue)[receiver].load);
169 #endif
170     if(msg->notidle){
171         if(old_load == 0 || old_load == 1)
172         {
173             CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = _U_INT_MAX;
174             CpvAccess(CldSlavesPriorityQueue)[receiver].load = 0;
175         }else
176         {
177             new_load = old_load - 1;
178             CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
179             CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = old_average_prior/new_load * old_load - msg->priority/new_load;
180         }
181     } 
182    
183     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
184     if(old_load < THRESHOLD_LOAD)
185     {
186         priormsg *p_msg = heap_extractMin(&CpvAccess(CldManagerLoadQueue));
187         if(p_msg == 0)
188         {
189 #if YH_DEBUG
190             CmiPrintf(" Step 6-1 :%f Queue is empty no task %d<======= getrequest  from processor queue current size=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)));
191 #endif
192         return;
193         }
194         
195         loadmsg = p_msg->msg;
196         SendTasktoPe(receiver, loadmsg);
197     }
198 }
199
200 /***********************/
201 /* since I am idle, ask for work from neighbors */
202 static void CldBeginIdle(void *dummy)
203 {
204     CpvAccess(CldData)->lastCheck = CmiWallTimer();
205 }
206
207 static void CldEndIdle(void *dummy)
208 {
209     CpvAccess(CldData)->lastCheck = -1;
210 }
211
212 static void CldStillIdle(void *dummy, double curT)
213 {
214
215
216     if(CmiMyPe() == 0) 
217     {
218 #if YH_DEBUG
219         CmiPrintf(" Processor %d is idle, queue size=%d \n", CmiMyPe(), heap_size(&CpvAccess(CldManagerLoadQueue)) );
220 #endif
221         return;
222     }else
223     {
224 #if YH_DEBUG
225         CmiPrintf("Processor %d, has task number of %d\n", CmiMyPe(), CpvAccess(CldData)->load); 
226 #endif
227     }
228
229     int i;
230     double startT;
231     requestmsg msg;
232     CldProcInfo  cldData = CpvAccess(CldData);
233     double now = curT;
234     double lt = cldData->lastCheck;
235    
236     
237     CldPriorityToken  head = CpvAccess(headsentinel);
238     CldPriorityToken exectoken;
239     unsigned int priority = _U_INT_MAX; 
240     exectoken = CpvAccess(headsentinel)->succ;
241     if(exectoken)
242     {
243         CldRestoreHandler(exectoken->msg); 
244         CmiHandleMessage(exectoken->msg);
245         head->succ= exectoken->succ;
246         CmiFree(exectoken);
247         cldData->load = cldData->load - 1;
248         msg.notidle = 1;
249 #if YH_DEBUG
250         CmiPrintf("Step 1000: processor %d finishes processing one msg =>Send request for task,  last check time %f, current time=%f, notidle11=%d, load=%d\n", CmiMyPe(), lt, now, msg.notidle, cldData->load);
251 #endif
252     }else
253     {
254         cldData->load  = 0;
255         msg.notidle = 0;
256         if ((lt!=-1 && now-lt< PERIOD*0.001) ) return;
257 #if YH_DEBUG
258         CmiPrintf("Step 1000: processor %d task is already zero ", CmiMyPe());
259 #endif
260     }
261
262     if(cldData->load >= THRESHOLD_LOAD)
263     {
264 #if YH_DEBUG
265         CmiPrintf(" worker processor %d finishes processing one msg =>Send request for task,  last check time %f, current time=%f, load=%d\n", CmiMyPe(), lt, now, cldData->load);
266 #endif
267         return;
268     }
269     cldData->lastCheck = now;
270     msg.from_pe = CmiMyPe();
271     CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
272
273     cldData->sent = 1;
274     /* fixme */
275     //CmiBecomeImmediate(&msg);
276     //msg.to_rank = CmiRankOf(0);
277 #if YH_DEBUG
278     CmiPrintf("Step 1000: processor %d task is already zero sentidle=%d", CmiMyPe(), (&msg)->notidle);
279 #endif
280     CmiSyncSend(0, sizeof(requestmsg), &msg);
281 }
282 /*
283 void CldReadytoExec(CldPriorityToken readytoken)
284 {
285     CldPriorityToken  head = CpvAccess(headsentinel);
286     CldPriorityToken exectoken;
287     unsigned int priority = _U_INT_MAX; 
288     exectoken = CpvAccess(headsentinel)->succ;
289 #if YH_DEBUG
290     CmiPrintf("Step 4: processor %d, task is ready to run with priority=%u, Timer=%f\n", CmiMyPe(), readytoken->priority, CmiTimer());
291 #endif
292
293     if(exectoken)
294     {
295         priority = exectoken->priority;
296 #if YH_DEBUG
297     CmiPrintf("Step 4-1: processor %d, task is ready to run with priority=%u, Timer=%f\n", CmiMyPe(), priority, CmiTimer());
298 #endif
299         CldRestoreHandler(exectoken->msg); 
300         CmiHandleMessage(exectoken->msg);
301         head->succ= exectoken->succ;
302         CmiFree(exectoken);
303     }
304 #if YH_DEBUG
305     CmiPrintf("Step 4-2: processor %d, task is done Timer=%f\n", CmiMyPe(), CmiTimer());
306 #endif
307 #if YH_DEBUG
308     CmiPrintf("Step 5: processor %d, send request to processor 0 to ask for load Timer=%f\n", CmiMyPe(), CmiTimer());
309 #endif
310     requestmsg updatemsg;
311
312     CldInfoFn ifn;
313     CldPackFn pfn;
314     int len, queueing, priobits; 
315     updatemsg.priority = priority;
316     updatemsg.from_pe = CmiMyPe();
317     CmiSetHandler(&updatemsg, CpvAccess(CldAskLoadHandlerIndex));
318     //CmiBecomeImmediate(&updatemsg);
319     CmiSyncSend(0, sizeof(requestmsg), &updatemsg);
320 }
321 */
322 void HigherPriorityWork(void *msg)
323 {
324     //wrap this msg with token and put it into token queue
325     
326     CldInfoFn ifn;
327     CldPackFn pfn;
328     int len, queueing, priobits; 
329     unsigned int *prioptr;
330     CldPriorityToken priortoken;
331     CldProcInfo  cldData = CpvAccess(CldData);
332     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
333     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
334
335 #if YH_DEBUG
336         CmiPrintf(" Step 3:  processor %d, Task arrives and sort the task first, prior=%u Timer=%f\n", CmiMyPe(), *prioptr, CmiTimer());
337 #endif
338     priortoken = (CldPriorityToken) CmiAlloc(sizeof(struct CldPriorityToken_s));
339     priortoken->msg = msg;
340     priortoken->priority = *prioptr; 
341     CldPriorityToken head = CpvAccess(headsentinel);
342     CldPriorityToken  tmptoken = head;//
343     CldPriorityToken  predtoken;
344     if(head->succ == NULL)
345     {
346         head->succ = priortoken;
347         priortoken->succ = NULL;
348         cldData->load = 1;
349     }else  //insert this token into queue accord to prior smaller ->higher
350     {
351         do{
352             predtoken = tmptoken;
353             tmptoken = tmptoken->succ;
354         }while(tmptoken != NULL && tmptoken->priority<= *prioptr);
355         priortoken->succ = tmptoken;
356         predtoken->succ = priortoken;
357         cldData->load = cldData->load  + 1;
358     }
359     //CmiSetHandler(priortoken, CpvAccess(CldReadytoExecHandlerIndex));
360     //CmiSyncSend(CmiMyPe(), sizeof(struct CldPriorityToken_s), priortoken); 
361 #if YH_DEBUG
362         CmiPrintf(" Step 3-1:  processor %d, Task has been inserted and re-Queue prior=%u Timer=%f, enqueue load=%d\n", CmiMyPe(), *prioptr, CmiTimer(), cldData->load);
363 #endif
364 }
365
366
367 void CldEnqueue(int pe, void *msg, int infofn)
368 {
369   int len, queueing, priobits, avg; 
370   unsigned int *prioptr;
371   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
372   CldPackFn pfn;
373  
374   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
375   CmiSetInfo(msg,infofn);
376 #if YH_DEBUG
377   CmiPrintf(" Step 1: Creation New msg on pe %d priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(),  *prioptr, CmiTimer(), len);
378 #endif
379
380   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
381       pe = CmiMyPe();
382     /* always pack the message because the message may be move away
383        to a different processor later by CldGetToken() */
384       CldSwitchHandler(msg, CpvAccess(CldstorecharemsgHandlerIndex));
385       if(pe == 0)
386       {
387           CldStoreCharemsg(msg);
388       }else{
389           if (pfn && CmiNumNodes()>1) {
390               pfn(&msg);
391               ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
392           }
393           CmiSyncSendAndFree(0, len, msg);
394       }
395   }else if((pe == CmiMyPe()) || (CmiNumPes() == 1) ) {
396   
397       CsdEnqueueGeneral(msg, CQS_QUEUEING_IFIFO, priobits, prioptr);
398   }else {
399       if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
400           pfn(&msg);
401           ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
402       }
403       if (pe==CLD_BROADCAST) 
404           CmiSyncBroadcastAndFree(len, msg);
405       else if (pe==CLD_BROADCAST_ALL)
406           CmiSyncBroadcastAllAndFree(len, msg);
407       else CmiSyncSendAndFree(pe, len, msg);
408
409   }
410 }
411
412 void CldHandler(char *msg)
413 {
414   int len, queueing, priobits;
415   unsigned int *prioptr;
416   CldInfoFn ifn; CldPackFn pfn;
417   CldRestoreHandler(msg);
418   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
419   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
420   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
421 }
422
423 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
424 {
425   int len, queueing, priobits,i; 
426   unsigned int *prioptr;
427   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
428   CldPackFn pfn;
429   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
430   if (pfn) {
431     pfn(&msg);
432     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
433   }
434   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
435   CmiSetInfo(msg,infofn);
436
437   CmiSyncMulticastAndFree(grp, len, msg);
438 }
439
440 void  CldOtherInit()
441 {
442
443   CpvInitialize(CldProcInfo, CldData);
444   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
445   CpvAccess(CldData)->lastCheck = -1;
446   CpvAccess(CldData)->sent = 0;
447   CpvAccess(CldData)->load = 0;
448 #if 1
449   _lbsteal = 1;//CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
450   if (_lbsteal) {
451   /* register idle handlers - when idle, keep asking work from neighbors */
452   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
453       (CcdVoidFn) CldBeginIdle, NULL);
454   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
455       (CcdVoidFn) CldStillIdle, NULL);
456   CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
457       (CcdVoidFn) CldEndIdle, NULL);
458     if (CmiMyPe() == 0) 
459       CmiPrintf("Charm++> Work stealing is enabled. \n");
460   }
461 #endif
462     
463
464   if (CmiMyPe() == 0){
465       int numpes = CmiNumPes();
466       CpvAccess(CldSlavesPriorityQueue) = (CldSlavePriorInfo*)CmiAlloc(sizeof(CldSlavePriorInfo) * numpes);
467       int i=0;
468       for(i=0; i<numpes; i++){
469           CpvAccess(CldSlavesPriorityQueue)[i].priority_1 = _U_INT_MAX;
470           CpvAccess(CldSlavesPriorityQueue)[i].priority_2 = _U_INT_MAX;
471           CpvAccess(CldSlavesPriorityQueue)[i].load = 0;
472       }
473   }
474 }
475
476 void CldModuleInit(char **argv)
477 {
478
479   CpvInitialize(int, CldHandlerIndex);
480   CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
481   /* Yanhua */
482   CpvInitialize(int, CldAskLoadHandlerIndex);
483   CpvInitialize(int, CldstorecharemsgHandlerIndex);
484   CpvInitialize(int, CldHigherPriorityComesHandlerIndex);
485   CpvInitialize(int, CldReadytoExecHandlerIndex);
486   CpvInitialize(MsgHeap, CldManagerLoadQueue);
487   CpvInitialize(CldSlavePriorInfo*, CldSlavesPriorityQueue);
488   CpvInitialize(void*, CldRequestQueue);
489   CpvInitialize(priormsg, nexttoexec);
490   CpvInitialize(CldPriorityToken, headsentinel);
491
492
493   CpvAccess(headsentinel) = (CldPriorityToken) CmiAlloc(sizeof(struct CldPriorityToken_s));
494   CpvAccess(headsentinel)->succ = NULL;
495   CpvAccess(nexttoexec).priority = _U_INT_MAX;
496
497   CpvAccess(CldstorecharemsgHandlerIndex) = CmiRegisterHandler(CldStoreCharemsg);
498   CpvAccess(CldHigherPriorityComesHandlerIndex) = CmiRegisterHandler(HigherPriorityWork);
499   CpvAccess(CldAskLoadHandlerIndex) = CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
500   //CpvAccess(CldReadytoExecHandlerIndex) = CmiRegisterHandler((CmiHandler)CldReadytoExec);
501   CpvAccess(CldRequestQueue) = (void *)CqsCreate();
502   CldModuleGeneralInit(argv);
503   
504   CldOtherInit();
505   
506   CpvAccess(CldLoadNotify) = 1;
507   //CpvAccess(tokenqueue)->head->succ = CpvAccess(tokenqueue)->tail;
508
509  
510 }
511
512
513
514 void CldNodeEnqueue(int node, void *msg, int infofn)
515 {
516   int len, queueing, priobits; 
517   unsigned int *prioptr;
518   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
519   CldPackFn pfn;
520   if (node == CLD_ANYWHERE) {
521     node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
522     if (node != CmiMyNode())
523       CpvAccess(CldRelocatedMessages)++;
524   }
525   if (node == CmiMyNode() && !CmiImmIsRunning()) {
526     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
527     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
528   } else {
529     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
530     if (pfn) {
531       pfn(&msg);
532       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
533     }
534     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
535     CmiSetInfo(msg,infofn);
536     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
537     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
538     else CmiSyncNodeSendAndFree(node, len, msg);
539   }
540 }
541
542 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
543 {
544   int len, queueing, priobits,i; 
545   unsigned int *prioptr;
546   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
547   CldPackFn pfn;
548   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
549   if (pfn) {
550     pfn(&msg);
551     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
552   }
553   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
554   CmiSetInfo(msg,infofn);
555
556   CmiSyncListSendAndFree(npes, pes, len, msg);
557 }
558
559