===== Class work 2 ===== Below you can find the code discussed in class and relative to: * the farm solution * the farm-map solution * the pipe solution * the pipe-discard solution /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /* * Author: Massimo Torquati * Date: November 2014 */ // // Toy example to test FastFlow farm. // It finds the prime numbers in the range (n1,n2) using a farm + feedback. // // |----> Worker -- // Emitter --|----> Worker --|-- // ^ |----> Worker -- | // |__________________________| // // - The Emitter generates all number between n1 and n2 and then waits // to receive primes from the Workers. // - The Worker check if the number is prime (by using the is_prime function), // if yes then it sends the prime number back to the Emitter. // // Usage example: // ./farm_primes 1900 2000 3 | tr -s ' ' '\n' | sort -n #include #include #include using namespace ff; // see http://en.wikipedia.org/wiki/Primality_test bool is_prime(unsigned long long n) { if (n <= 3) return n > 1; // 1 is not prime ! if (n % 2 == 0 || n % 3 == 0) return false; for (unsigned long long i = 5; i * i <= n; i += 6) { if (n % i == 0 || n % (i + 2) == 0) return false; } return true; } struct Emitter: ff_node_t { Emitter(ff_loadbalancer *lb, unsigned long long n1, unsigned long long n2) :lb(lb),n1(n1),n2(n2) {} long *svc(long *x) { if (x == nullptr) { // first time unsigned long long n; while( (n=n1++) <= n2 ) ff_send_out((void*)n); lb->broadcast_task(EOS); return GO_ON; } std::cout << reinterpret_cast(x) << " "; return GO_ON; } ff_loadbalancer *const lb; unsigned long long n1,n2; }; struct Worker: ff_node_t { long *svc(long *in) { if (is_prime(reinterpret_cast(in))) return in; return GO_ON; } }; int main(int argc, char *argv[]) { if (argc<4) { std::cerr << "use: " << argv[0] << " number1 number2 nworkers\n"; return -1; } unsigned long long n1 = atoll(argv[1]); unsigned long long n2 = atoll(argv[2]); const int nw = atoi(argv[3]); std::vector W; for(int i=0;i farm(W); // by default adds an emitter and a collector Emitter E(farm.getlb(), n1,n2); farm.add_emitter(&E); // replacing the default emitter farm.remove_collector(); // removing the default collector farm.wrap_around(); // adds feedback channel between worker and emitter farm.run_and_wait_end(); std::cout << "\n\n"; return 0; } /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /* * Author: Massimo Torquati * Date: November 2014 */ // SPM class exercise (Class Work 2) // Consider the possibility to list all prime numbers in an interval [MIn, MAX] by: // // - split the interval in a number of given subintervals nw (n workers) // - assig each subinterval to a farm worker // - the worker // for each i in the interval // checks if i is prime if yes it packs the value in an array // finally the array is sent to the gatherer thread // - the gatherer threads gets all results and produces an ordered lists // of all primes // // (primality check in each partition) // // |----> Worker -->| // Emitter --|----> Worker -->|--Collector // |----> Worker -->| // (splits input array (gathers all sub-partitions) // in partitions) // #include #include #include using namespace ff; // see http://en.wikipedia.org/wiki/Primality_test bool is_prime(unsigned long long n) { if (n <= 3) return n > 1; // 1 is not prime ! if (n % 2 == 0 || n % 3 == 0) return false; for (unsigned long long i = 5; i * i <= n; i += 6) { if (n % i == 0 || n % (i + 2) == 0) return false; } return true; } // stream type struct Task_t { Task_t(unsigned long long n1, unsigned long long n2):n1(n1),n2(n2) {} const unsigned long long n1, n2; // input range std::vector V; // output data }; // splits the range [n1,n2] in nw sub-ranges each one of size ~(n2-n1)/nw struct Emitter: ff_node_t { Emitter(const ff_loadbalancer *lb, unsigned long long n1, unsigned long long n2) :lb(lb),n1(n1),n2(n2) {} Task_t *svc(Task_t *) { const int nw = lb->getNWorkers(); // gets the total number of workers added to the farm const size_t size = (n2 - n1) / nw; ssize_t more = (n2-n1) % nw; unsigned long long start = n1; unsigned long long stop = n1; for(int i=0; i0 ? 1:0); --more; Task_t *task = new Task_t(start, stop); lb->ff_send_out_to(task, i); } return EOS; // broadcast EOS to all workers } const ff_loadbalancer *lb; unsigned long long n1, n2; }; struct Worker: ff_node_t { Task_t *svc(Task_t *in) { auto &V = in->V; unsigned long long n, n1(in->n1), n2(in->n2); while( (n=n1++) < n2 ) if (is_prime(n)) V.push_back(n); return in; } }; struct Collector: ff_node_t { Task_t *svc(Task_t *in) { auto V = in->V; if (V.size()) // I may receive empty sub-partitions primes.insert(std::upper_bound(primes.begin(), primes.end(), V[0]), V.begin(), V.end()); delete in; return GO_ON; } void svc_end() { for(size_t i=0;i primes; }; int main(int argc, char *argv[]) { if (argc<4) { std::cerr << "use: " << argv[0] << " number1 number2 nworkers\n"; return -1; } unsigned long long n1 = atoll(argv[1]); unsigned long long n2 = atoll(argv[2]); const int nw = atoi(argv[3]); std::vector W; for(int i=0;i farm(W, nullptr, &C); Emitter E(farm.getlb(), n1,n2); farm.add_emitter(&E); // replacing default emitter farm.cleanup_workers(); if (farm.run_and_wait_end()<0) error("running farm\n"); return 0; } /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /* * Author: Massimo Torquati * Date: November 2014 */ // // Toy example to test FastFlow pipeline. // It finds the first n prime numbers using a pipeline of n stages. // Please remember that in FastFlow pipeline stages are threads ! // Example: // pipe_primes 16 // // generator ---> stage(2) ---> stage(3) ---> stage(4) // ^ | // |___________________________________________| // // - The number of stages is floor(sqrt(n)) plus the generator. // - stage(i) sends out only those numbers x such that (x != i and (x % i) != 0) // - generators first generates all numbers, then prints all primes received // in input from the last stage. // #include #include #include #include using namespace ff; // stream generator struct generator: ff_node { generator(long n):n(n) {} void *svc(void *x) { if (x == nullptr) { // first time for(long x=2;x<=n;++x) ff_send_out((void*)x); ff_send_out(EOS); return GO_ON; } std::cout << reinterpret_cast(x) << " "; return GO_ON; } long n; }; // generic stage struct stage: ff_node_t { int svc_init() { myid = get_my_id() + 1 ; return 0; } long *svc(long *t) { long x = (long)t; if (x == myid) return t; if (x % myid) return t; return GO_ON; } long myid; }; int main(int argc, char *argv[]) { if (argc<2) { std::cerr << "use: " << argv[0] << " number (not too big !)\n"; return -1; } long n = atol(argv[1]); assert(n>1); // The number of stages (i.e. threads) is floor(sqrt(n))+1 || ff_pipe pipe(new generator(n)); long sq = sqrt(n); for(long i=2;i<=sq;++i) pipe.add_stage(new stage); pipe.cleanup_nodes(); pipe.wrap_around(); pipe.run_and_wait_end(); std::cout << "\n\n"; return 0; } /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /* * Author: Massimo Torquati * Date: November 2014 */ // SPM class exercise (Class Work 2) // Set up a FastFlow pipeline filtering a stream of integers from 2 to MAX, // (MAX is a command line argument) such that: // - the first stage generates integers from 2 to MAX // - the i-th stage: // - if it does not have any number stored, stores the first number received // and does not forward it to the next stage // - if it has a number stored: // - passes the received number only if it is not a multiple of the stored integer // otherwise discards the received number // - when EOS is received, each stage prints its stored number (a prime number) // // // generator ---> stage(2) ---> stage(3) ---> stage(5)... -->discard // // #include #include #include #include using namespace ff; // stream generator struct generator: ff_node_t { generator(long n):n(n) {} long *svc(long *) { for(long x=2;x<=n;++x) ff_send_out(reinterpret_cast(x)); return EOS; } long n; }; // generic stage struct stage: ff_node_t { int svc_init() { storage = 0 ; return 0; } long *svc(long *t) { long x = (long)t; if (!storage) storage = x; if (x % storage) return t; return GO_ON; } void svc_end() { if (storage) std::cout << storage << " "; } long storage; }; // task discarder struct discarder: ff_node { void *svc(void *t) { return GO_ON; } }; int main(int argc, char *argv[]) { if (argc<2) { std::cerr << "use: " << argv[0] << " number (not too big !)\n"; return -1; } long n = atol(argv[1]); assert(n>1); ff_pipe pipe(new generator(n)); // adding the first stage for(long i=2;i<=n;++i) pipe.add_stage(new stage); // adding all "middle" stages pipe.add_stage(new discarder); // adding the last stage pipe.cleanup_nodes(); if (pipe.run_and_wait_end()<0) error("running pipe\n"); std::cout << "\n\n"; return 0; }