Main Page   Class Hierarchy   Data Structures   File List   Data Fields   Globals  

MPI_sigma.C

Go to the documentation of this file.
00001 
00002 
00003 
00004 //
00005 //                     Simon Portegies Zwart   MIT  Nov 2000
00006 
00007 #include "sigma_MPI.h"
00008 
00009 #ifndef TOOLBOX
00010 
00011 #ifdef USE_MPI
00012 
00013 void initialize_MPI(int &myid, int &nproc) {
00014 
00015   int argc = 0;
00016   char **argv = NULL;
00017   MPI::Init(argc, argv);
00018 
00019   myid = MPI::COMM_WORLD.Get_rank();
00020   nproc = MPI::COMM_WORLD.Get_size();
00021 }
00022 
00023 void finalize_MPI() {
00024 
00025   MPI::Finalize();
00026 }
00027 
00028 MPI_Datatype sigma_input::initialize_data_structures_MPI() {
00029 
00030   // MPI definition of datatype input_type
00031   int inputblockcounts[4] = {255, 7, 7, 7};
00032   MPI::Aint displs[4];  
00033   MPI_Datatype inputtype;
00034 
00035   MPI_Address(&init_string, &displs[0]);
00036   MPI_Address(&delta_t, &displs[1]);
00037   MPI_Address(&n_experiments, &displs[2]);
00038   MPI_Address(&rho_sq_min, &displs[3]);
00039   MPI_Datatype ntypes[4] = {MPI::CHAR, MPI::DOUBLE, MPI::INT, MPI::DOUBLE};
00040   displs[1] -= displs[0];
00041   displs[2] -= displs[0];
00042   displs[3] -= displs[0];
00043   displs[0] = 0;
00044 
00045   MPI_Type_struct(4, inputblockcounts, displs, ntypes, &inputtype);
00046   MPI_Type_commit(&inputtype);
00047 
00048   return inputtype;
00049 }
00050 
00051 #else // No MPI
00052 
00053 void initialize_MPI(int &myid, int &nproc) { 
00054   // do nothing
00055 }
00056 
00057 void finalize_MPI() {
00058   // do nothin
00059 }
00060 
00061 #endif
00062 
00063 void execute_sigma_experiment(sigma_input &input) {
00064 
00065   int myid=0, nproc;
00066   initialize_MPI(myid, nproc);
00067 
00068   scatter_exp experiment; 
00069 
00070   MPI_Datatype inputtype = input.initialize_data_structures_MPI();
00071   MPI_Datatype scatter_exp_type = experiment.initialize_data_structures_MPI();
00072 
00073   int master = 0;
00074   if(myid==master) {
00075     get_sigma(input, inputtype, experiment, scatter_exp_type);
00076   }
00077   else {
00078     slave_process(master, 
00079                   input, inputtype, 
00080                   experiment, scatter_exp_type);
00081   }
00082 
00083   finalize_MPI();
00084 
00085 }
00086 
00087 local void slave_part_of_experiment(sigma_input input,
00088                                     scatter_exp &experiment) {
00089 
00090   int random_seed = srandinter(input.seed, input.n_rand);
00091 
00092   if(input.verbose==1) 
00093     cerr << "Random seed = " << get_initial_seed()
00094          << "  n_rand = " << get_n_rand() << flush << "\n";
00095     
00096   sdyn *b = mkscat(input.init_string, input);
00097     
00098   b->flatten_node();
00099     
00100   real kin, pot;
00101   real etot_init = calculate_energy_from_scratch(b, kin, pot);
00102   if(input.verbose==1) 
00103     cerr << "Time = " << 0 << "  Etot = " << kin + pot 
00104          << " (" << kin << ", " << pot
00105          << ")  Tinf = " << kin/pot << endl;
00106   make_tree(b, !DYNAMICS, STABILITY, K_MAX, input.debug);
00107   b->set_name("root");
00108   
00109   // Initial output:
00110     
00111   if(input.verbose==2) {
00112     cerr << "*** Initial configuration (random seed = "
00113          << get_initial_seed() << "):\n";
00114 
00115     print_normal_form(b, cerr);
00116     vector center = b->get_pos();
00117     print_structure_recursive(b, 0., center, true, true, 4);
00118   }
00119     
00120   // Integrate the system to completion:
00121   int hit = single_scatter(b, (scatter_input)input, experiment);
00122 
00123   //  PRC(hit);
00124   //  PRC(experiment.get_nzone());
00125   //  PRL(experiment.get_nhits(experiment.get_nzone()));
00126 
00127   cerr.precision(6);
00128   if (input.debug) cerr << endl;
00129 
00130   if(input.verbose==2) {
00131     cerr << "*** Final system configuration (time = " 
00132          << b->get_time() << "):\n";
00133     cerr << "Final Normal form:  ";
00134     print_normal_form(b, cerr);
00135 
00136     ppn(b, cerr);
00137     vector center = b->get_pos();
00138     print_structure_recursive(b, 0., center, true, true, 4);
00139 
00140     cerr << "Energy error = " << experiment.get_energy_error() << endl;
00141   }
00142 
00143   print_scatter_specific_information(b, input, experiment);
00144 
00145   delete b;
00146 
00147 }
00148 
00149 #ifdef USE_MPI
00150 void slave_process(int master, 
00151                    sigma_input &input, MPI_Datatype inputtype,
00152                    scatter_exp &experiment, MPI_Datatype scatter_exp_type) {
00153 
00154   int myid = MPI::COMM_WORLD.Get_rank();
00155   MPI::Status status;  
00156 
00157   int length = 1;
00158 
00159   int request_for_data = 1;
00160   int data_available;
00161   int sending_data;
00162 
00163     do {
00164 
00165       request_for_data = myid;
00166       //      cerr << "Slave " << myid << " requests new job "<<endl;
00167 
00168       MPI::COMM_WORLD.Send(&request_for_data, length, MPI::INT, master, 
00169                            DATA_REQUEST_TAG);
00170       MPI::COMM_WORLD.Recv(&data_available, length, MPI::INT, master, 
00171                            DATA_REQUEST_TAG, status); 
00172       //      PRC(myid);PRL(data_available);
00173       if(data_available == SEND_HOLD_TAG) {
00174         //      cerr << "Slave " << myid << " is put on hold until furthre notice"<<endl;
00175         MPI::COMM_WORLD.Recv(&data_available, length, MPI::INT, master, 
00176                              STOP_HOLD_TAG, status); 
00177         //      cerr << "Slave " << myid << " continues his work"<<endl;
00178         MPI::COMM_WORLD.Send(&request_for_data, length, MPI::INT, master, 
00179                              DATA_REQUEST_TAG);
00180         MPI::COMM_WORLD.Recv(&data_available, length, MPI::INT, master, 
00181                              DATA_REQUEST_TAG, status); 
00182       }
00183 
00184       if(data_available<=0) {
00185         cerr << "Terminate slave "<< myid<<endl;
00186         return;
00187       }
00188 
00189       //      cerr << "Slave " << myid << " waiting for data "<<endl;
00190 
00191       MPI::COMM_WORLD.Recv(&input, length, inputtype, master, 
00192                            DATA_COMING_TAG);
00193       MPI::COMM_WORLD.Recv(&experiment, length, scatter_exp_type, master, 
00194                            DATA_COMING_TAG);
00195 
00196       //      cerr << "Slave " << myid << " received data (now working) "<<endl;
00197       slave_part_of_experiment(input, experiment);
00198         
00199       //      cerr << "Slave " << myid << " requests to send data "<<endl;
00200       sending_data = myid;
00201       MPI::COMM_WORLD.Send(&sending_data, length, MPI::INT, master, 
00202                            DATA_COMING_TAG);
00203       MPI::COMM_WORLD.Send(&experiment, length, scatter_exp_type, 
00204                              master, DATA_COMING_TAG);
00205     }
00206     while(true);
00207 
00208 }
00209 #else
00210 void slave_process(int master, 
00211                    sigma_input &input, MPI_Datatype inputtype,
00212                    scatter_exp &experiment, MPI_Datatype scatter_exp_type) {
00213 
00214       slave_part_of_experiment(input, experiment);
00215 }
00216 #endif
00217 
00218 #ifdef USE_MPI
00219 void terminate_all_processors() {
00220 
00221   cerr << "Start terminating all processes"<<endl;
00222 
00223   int myid = MPI::COMM_WORLD.Get_rank();
00224   int nproc = MPI::COMM_WORLD.Get_size();
00225 
00226   int accept_all_senders = MPI::ANY_SOURCE;
00227 
00228   int slave;
00229   int request_for_data;
00230   int length = 1;
00231   int data_available = 0;
00232   for(int i=1; i<nproc; i++) {
00233 
00234       MPI::COMM_WORLD.Recv(&request_for_data, length, MPI::INT,
00235                            accept_all_senders, DATA_REQUEST_TAG);
00236     
00237       slave = request_for_data;
00238       MPI::COMM_WORLD.Send(&data_available, length, MPI::INT, slave, 
00239                            DATA_REQUEST_TAG);
00240   }
00241 
00242   cerr << "All processes terminated" << endl;
00243 }
00244 #else
00245 void terminate_all_processors() {
00246   // dummy
00247 }
00248 #endif
00249 
00250 #ifdef USE_MPI
00251 int master_process(sigma_out & out,
00252                    sigma_input &input, MPI_Datatype inputtype,
00253                    scatter_exp &experiment, MPI_Datatype scatter_exp_type) {
00254 
00255   int myid = MPI::COMM_WORLD.Get_rank();
00256   int nproc = MPI::COMM_WORLD.Get_size();
00257   MPI::Status status;  
00258 
00259   // keep track of holding processes
00260   bool *hold      = new bool[nproc];
00261   real *hold_time = new real[nproc];
00262   for(int i=0; i<nproc; i++) {
00263     hold[i] = false;
00264     hold_time[i] = 0;
00265   }
00266 
00267   int accept_all_senders = MPI::ANY_SOURCE;
00268   int accept_all_tags    = MPI::ANY_TAG;
00269 
00270   int length = 1;
00271   int nsend = 0;
00272 
00273   int n_exp = 0;
00274 
00275   int slave, tag;
00276   int request_for_data;
00277   int data_available = 1;
00278   int slave_sends_data;
00279 
00280   do {
00281 
00282     real cpu_init = cpu_time();
00283     MPI::COMM_WORLD.Recv(&request_for_data, length, MPI::INT,
00284                          accept_all_senders, accept_all_tags, status);
00285 
00286     slave = status.Get_source();
00287     tag = status.Get_tag();
00288 
00289     //    cerr << "Master received request from slave " << slave 
00290     //   << " for " << tag << endl;
00291 
00292     switch(tag) {
00293     case DATA_REQUEST_TAG:
00294       if(nsend>=input.n_experiments) {
00295         //      cerr << "Put process " << slave << " on hold" << endl;
00296         hold[slave] = true;
00297         hold_time[slave] = cpu_time();
00298         int please_hold = SEND_HOLD_TAG;
00299         MPI::COMM_WORLD.Send(&please_hold, length, MPI::INT, 
00300                              slave, DATA_REQUEST_TAG);
00301 
00302       }
00303       else {
00304         //      cerr << "Master: Slave " << slave << " requests data"<<endl;
00305         MPI::COMM_WORLD.Send(&data_available, length, MPI::INT, slave, 
00306                              DATA_REQUEST_TAG);
00307         input.n_rand += input.n_rand_inc;
00308         
00309         MPI::COMM_WORLD.Send(&input, length, inputtype, slave, 
00310                              DATA_COMING_TAG);
00311         MPI::COMM_WORLD.Send(&experiment, length, scatter_exp_type, slave, 
00312                              DATA_COMING_TAG);
00313         nsend++;
00314       }
00315       break;
00316     case WRITE_REQUEST_TAG:
00317       //      cerr << "Master: Slave " << slave << " requests to write"<<endl;
00318       MPI::COMM_WORLD.Send(&data_available, length, MPI::INT, 
00319                            slave, WRITE_REQUEST_TAG);
00320       MPI::COMM_WORLD.Recv(&request_for_data, length, MPI::INT,
00321                            slave, WRITE_READY_TAG, status);
00322       break;
00323     case DATA_COMING_TAG:
00324       //      cerr << "Master: Slave " << slave << " requests send data"<<endl;
00325       MPI::COMM_WORLD.Recv(&experiment, length, scatter_exp_type, 
00326                            slave,  DATA_COMING_TAG, status);
00327       single_scatter_stats(&experiment, out);
00328       cerr << "Experiment (" << out.n_zone << ") #" << n_exp << ": ";
00329       cerr << " cpu= " << cpu_time() - cpu_init
00330            << "   Time = " << experiment.get_time() 
00331            << "   result: " << experiment.get_final_form() 
00332            << " (" << experiment.get_scatter_discriptor() << ")"
00333            << endl;
00334       n_exp++;
00335       break;
00336     };
00337     
00338     //    PRC(out.n_zone);PRC(nsend);PRC(n_exp);PRL(input.n_experiments);
00339   }
00340   while(n_exp < input.n_experiments);
00341 
00342   // resume holding processes.
00343   for(slave=1; slave<nproc; slave++) {
00344     if(hold[slave]) {
00345       //      cerr << "stop holding for process " << slave << endl;
00346         int dummy=1;
00347         MPI::COMM_WORLD.Send(&dummy, length, MPI::INT, 
00348                              slave, STOP_HOLD_TAG);
00349         hold[slave] = false;
00350         //      cerr << "Slave " << slave << " has hold for " 
00351         //           << cpu_time() - hold_time[slave] << " seconds" << endl;
00352 
00353     }
00354   }
00355       
00356   cerr << "Master ("<<myid<<"): has done "<<n_exp << " experiments " <<endl;
00357   //  PRC(out.n_zone);PRC(n_hits);
00358   //  PRL(experiment.get_nhits(out.n_zone));
00359 
00360   return experiment.get_nhits(out.n_zone);
00361 
00362 }
00363 #else
00364 int master_process(sigma_out & out,
00365                    sigma_input &input, MPI_Datatype inputtype,
00366                    scatter_exp &experiment, MPI_Datatype scatter_exp_type) {
00367 
00368   input.n_rand_inc = get_n_rand() - input.n_rand; 
00369   int n_exp = 0;
00370 
00371   int master = 0;
00372   do {
00373     cerr << "Experiment (" << out.n_zone << ") #" << n_exp << ": ";
00374 
00375     input.n_rand += input.n_rand_inc;
00376     experiment.set_nzone(out.n_zone);
00377 
00378     real cpu_init = cpu_time();
00379     slave_process(master, 
00380                   input, inputtype, 
00381                   experiment, scatter_exp_type);
00382 
00383     cerr << " cpu= " << cpu_time() - cpu_init
00384          << "   Time = " << experiment.get_time() 
00385          << "   result: " << experiment.get_final_form() << endl;
00386     single_scatter_stats(&experiment, out);
00387 
00388     n_exp++;
00389   }
00390   while(n_exp < input.n_experiments);
00391 
00392   return experiment.get_nhits(out.n_zone);
00393 }
00394 #endif
00395 
00396 #endif

Generated at Sun Feb 24 09:57:10 2002 for STARLAB by doxygen1.2.6 written by Dimitri van Heesch, © 1997-2001