===== Class work 2 =====
Below you can find the code discussed in class and relative to:
* the farm solution
* the farm-map solution
* the pipe solution
* the pipe-discard solution
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* As a special exception, you may use this file as part of a free software
* library without restriction. Specifically, if other files instantiate
* templates or use macros or inline functions from this file, or you compile
* this file and link it with other files to produce an executable, this
* file does not by itself cause the resulting executable to be covered by
* the GNU General Public License. This exception does not however
* invalidate any other reasons why the executable file might be covered by
* the GNU General Public License.
*
****************************************************************************
*/
/*
* Author: Massimo Torquati
* Date: November 2014
*/
//
// Toy example to test FastFlow farm.
// It finds the prime numbers in the range (n1,n2) using a farm + feedback.
//
// |----> Worker --
// Emitter --|----> Worker --|--
// ^ |----> Worker -- |
// |__________________________|
//
// - The Emitter generates all number between n1 and n2 and then waits
// to receive primes from the Workers.
// - The Worker check if the number is prime (by using the is_prime function),
// if yes then it sends the prime number back to the Emitter.
//
// Usage example:
// ./farm_primes 1900 2000 3 | tr -s ' ' '\n' | sort -n
#include
#include
#include
using namespace ff;
// see http://en.wikipedia.org/wiki/Primality_test
bool is_prime(unsigned long long n) {
if (n <= 3) return n > 1; // 1 is not prime !
if (n % 2 == 0 || n % 3 == 0) return false;
for (unsigned long long i = 5; i * i <= n; i += 6) {
if (n % i == 0 || n % (i + 2) == 0)
return false;
}
return true;
}
struct Emitter: ff_node_t {
Emitter(ff_loadbalancer *lb,
unsigned long long n1, unsigned long long n2)
:lb(lb),n1(n1),n2(n2) {}
long *svc(long *x) {
if (x == nullptr) { // first time
unsigned long long n;
while( (n=n1++) <= n2 ) ff_send_out((void*)n);
lb->broadcast_task(EOS);
return GO_ON;
}
std::cout << reinterpret_cast(x) << " ";
return GO_ON;
}
ff_loadbalancer *const lb;
unsigned long long n1,n2;
};
struct Worker: ff_node_t {
long *svc(long *in) {
if (is_prime(reinterpret_cast(in))) return in;
return GO_ON;
}
};
int main(int argc, char *argv[]) {
if (argc<4) {
std::cerr << "use: " << argv[0] << " number1 number2 nworkers\n";
return -1;
}
unsigned long long n1 = atoll(argv[1]);
unsigned long long n2 = atoll(argv[2]);
const int nw = atoi(argv[3]);
std::vector W;
for(int i=0;i farm(W); // by default adds an emitter and a collector
Emitter E(farm.getlb(), n1,n2);
farm.add_emitter(&E); // replacing the default emitter
farm.remove_collector(); // removing the default collector
farm.wrap_around(); // adds feedback channel between worker and emitter
farm.run_and_wait_end();
std::cout << "\n\n";
return 0;
}
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* As a special exception, you may use this file as part of a free software
* library without restriction. Specifically, if other files instantiate
* templates or use macros or inline functions from this file, or you compile
* this file and link it with other files to produce an executable, this
* file does not by itself cause the resulting executable to be covered by
* the GNU General Public License. This exception does not however
* invalidate any other reasons why the executable file might be covered by
* the GNU General Public License.
*
****************************************************************************
*/
/*
* Author: Massimo Torquati
* Date: November 2014
*/
// SPM class exercise (Class Work 2)
// Consider the possibility to list all prime numbers in an interval [MIn, MAX] by:
//
// - split the interval in a number of given subintervals nw (n workers)
// - assig each subinterval to a farm worker
// - the worker
// for each i in the interval
// checks if i is prime if yes it packs the value in an array
// finally the array is sent to the gatherer thread
// - the gatherer threads gets all results and produces an ordered lists
// of all primes
//
// (primality check in each partition)
//
// |----> Worker -->|
// Emitter --|----> Worker -->|--Collector
// |----> Worker -->|
// (splits input array (gathers all sub-partitions)
// in partitions)
//
#include
#include
#include
using namespace ff;
// see http://en.wikipedia.org/wiki/Primality_test
bool is_prime(unsigned long long n) {
if (n <= 3) return n > 1; // 1 is not prime !
if (n % 2 == 0 || n % 3 == 0) return false;
for (unsigned long long i = 5; i * i <= n; i += 6) {
if (n % i == 0 || n % (i + 2) == 0)
return false;
}
return true;
}
// stream type
struct Task_t {
Task_t(unsigned long long n1, unsigned long long n2):n1(n1),n2(n2) {}
const unsigned long long n1, n2; // input range
std::vector V; // output data
};
// splits the range [n1,n2] in nw sub-ranges each one of size ~(n2-n1)/nw
struct Emitter: ff_node_t {
Emitter(const ff_loadbalancer *lb,
unsigned long long n1, unsigned long long n2)
:lb(lb),n1(n1),n2(n2) {}
Task_t *svc(Task_t *) {
const int nw = lb->getNWorkers(); // gets the total number of workers added to the farm
const size_t size = (n2 - n1) / nw;
ssize_t more = (n2-n1) % nw;
unsigned long long start = n1;
unsigned long long stop = n1;
for(int i=0; i0 ? 1:0);
--more;
Task_t *task = new Task_t(start, stop);
lb->ff_send_out_to(task, i);
}
return EOS; // broadcast EOS to all workers
}
const ff_loadbalancer *lb;
unsigned long long n1, n2;
};
struct Worker: ff_node_t {
Task_t *svc(Task_t *in) {
auto &V = in->V;
unsigned long long n, n1(in->n1), n2(in->n2);
while( (n=n1++) < n2 ) if (is_prime(n)) V.push_back(n);
return in;
}
};
struct Collector: ff_node_t {
Task_t *svc(Task_t *in) {
auto V = in->V;
if (V.size()) // I may receive empty sub-partitions
primes.insert(std::upper_bound(primes.begin(), primes.end(), V[0]),
V.begin(), V.end());
delete in;
return GO_ON;
}
void svc_end() {
for(size_t i=0;i primes;
};
int main(int argc, char *argv[]) {
if (argc<4) {
std::cerr << "use: " << argv[0] << " number1 number2 nworkers\n";
return -1;
}
unsigned long long n1 = atoll(argv[1]);
unsigned long long n2 = atoll(argv[2]);
const int nw = atoi(argv[3]);
std::vector W;
for(int i=0;i farm(W, nullptr, &C);
Emitter E(farm.getlb(), n1,n2);
farm.add_emitter(&E); // replacing default emitter
farm.cleanup_workers();
if (farm.run_and_wait_end()<0) error("running farm\n");
return 0;
}
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* As a special exception, you may use this file as part of a free software
* library without restriction. Specifically, if other files instantiate
* templates or use macros or inline functions from this file, or you compile
* this file and link it with other files to produce an executable, this
* file does not by itself cause the resulting executable to be covered by
* the GNU General Public License. This exception does not however
* invalidate any other reasons why the executable file might be covered by
* the GNU General Public License.
*
****************************************************************************
*/
/*
* Author: Massimo Torquati
* Date: November 2014
*/
//
// Toy example to test FastFlow pipeline.
// It finds the first n prime numbers using a pipeline of n stages.
// Please remember that in FastFlow pipeline stages are threads !
// Example:
// pipe_primes 16
//
// generator ---> stage(2) ---> stage(3) ---> stage(4)
// ^ |
// |___________________________________________|
//
// - The number of stages is floor(sqrt(n)) plus the generator.
// - stage(i) sends out only those numbers x such that (x != i and (x % i) != 0)
// - generators first generates all numbers, then prints all primes received
// in input from the last stage.
//
#include
#include
#include
#include
using namespace ff;
// stream generator
struct generator: ff_node {
generator(long n):n(n) {}
void *svc(void *x) {
if (x == nullptr) { // first time
for(long x=2;x<=n;++x) ff_send_out((void*)x);
ff_send_out(EOS);
return GO_ON;
}
std::cout << reinterpret_cast(x) << " ";
return GO_ON;
}
long n;
};
// generic stage
struct stage: ff_node_t {
int svc_init() { myid = get_my_id() + 1 ; return 0; }
long *svc(long *t) {
long x = (long)t;
if (x == myid) return t;
if (x % myid) return t;
return GO_ON;
}
long myid;
};
int main(int argc, char *argv[]) {
if (argc<2) {
std::cerr << "use: " << argv[0] << " number (not too big !)\n";
return -1;
}
long n = atol(argv[1]);
assert(n>1);
// The number of stages (i.e. threads) is floor(sqrt(n))+1 ||
ff_pipe pipe(new generator(n));
long sq = sqrt(n);
for(long i=2;i<=sq;++i) pipe.add_stage(new stage);
pipe.cleanup_nodes();
pipe.wrap_around();
pipe.run_and_wait_end();
std::cout << "\n\n";
return 0;
}
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* As a special exception, you may use this file as part of a free software
* library without restriction. Specifically, if other files instantiate
* templates or use macros or inline functions from this file, or you compile
* this file and link it with other files to produce an executable, this
* file does not by itself cause the resulting executable to be covered by
* the GNU General Public License. This exception does not however
* invalidate any other reasons why the executable file might be covered by
* the GNU General Public License.
*
****************************************************************************
*/
/*
* Author: Massimo Torquati
* Date: November 2014
*/
// SPM class exercise (Class Work 2)
// Set up a FastFlow pipeline filtering a stream of integers from 2 to MAX,
// (MAX is a command line argument) such that:
// - the first stage generates integers from 2 to MAX
// - the i-th stage:
// - if it does not have any number stored, stores the first number received
// and does not forward it to the next stage
// - if it has a number stored:
// - passes the received number only if it is not a multiple of the stored integer
// otherwise discards the received number
// - when EOS is received, each stage prints its stored number (a prime number)
//
//
// generator ---> stage(2) ---> stage(3) ---> stage(5)... -->discard
//
//
#include
#include
#include
#include
using namespace ff;
// stream generator
struct generator: ff_node_t {
generator(long n):n(n) {}
long *svc(long *) {
for(long x=2;x<=n;++x)
ff_send_out(reinterpret_cast(x));
return EOS;
}
long n;
};
// generic stage
struct stage: ff_node_t {
int svc_init() { storage = 0 ; return 0; }
long *svc(long *t) {
long x = (long)t;
if (!storage) storage = x;
if (x % storage) return t;
return GO_ON;
}
void svc_end() {
if (storage)
std::cout << storage << " ";
}
long storage;
};
// task discarder
struct discarder: ff_node {
void *svc(void *t) { return GO_ON; }
};
int main(int argc, char *argv[]) {
if (argc<2) {
std::cerr << "use: " << argv[0] << " number (not too big !)\n";
return -1;
}
long n = atol(argv[1]);
assert(n>1);
ff_pipe pipe(new generator(n)); // adding the first stage
for(long i=2;i<=n;++i) pipe.add_stage(new stage); // adding all "middle" stages
pipe.add_stage(new discarder); // adding the last stage
pipe.cleanup_nodes();
if (pipe.run_and_wait_end()<0) error("running pipe\n");
std::cout << "\n\n";
return 0;
}