#include #include "../shared/util.h" #include "../../include/ctf.hpp" tCTF_ScheduleBase* global_schedule; template void tCTF_Schedule::record() { global_schedule = this; } template inline void tCTF_Schedule::schedule_op_successors(tCTF_TensorOperation* op) { assert(op->dependency_left == 0); typename std::vector* >::iterator it; for (it=op->successors.begin(); it!=op->successors.end(); it++) { (*it)->dependency_left--; assert((*it)->dependency_left >= 0); if ((*it)->dependency_left == 0) { ready_tasks.push_back(*it); } } } /** * \brief Data structure containing what each partition is going to do. */ template struct tCTF_PartitionOps { int color; tCTF_World* world; std::vector*> ops; // operations to execute std::set*> local_tensors; // all local tensors used std::map*, tCTF_Tensor*> remap; // mapping from global tensor -> local tensor std::set*> global_tensors; // all referenced tensors stored as global tensors std::set*> output_tensors; // tensors to be written back out, stored as global tensors }; template void tCTF_Schedule::partition_and_execute() { int rank, size; MPI_Comm_rank(world->comm, &rank); MPI_Comm_size(world->comm, &size); // Partition operations into worlds, and do split std::vector > comm_ops; // operations for each subcomm int my_color = rank % ready_tasks.size(); int total_colors = size <= ready_tasks.size()? size : ready_tasks.size(); MPI_Comm my_comm; MPI_Comm_split(world->comm, my_color, rank, &my_comm); for (int color=0; color()); comm_ops[color].color = color; if (color == my_color) { comm_ops[color].world = new tCTF_World(my_comm); } else { comm_ops[color].world = NULL; } // dummy partitioning for now // TODO: better partitioning approach comm_ops[color].ops.push_back(ready_tasks.front()); ready_tasks.pop_front(); } // Initialize local data structures for (auto &comm_op : comm_ops) { // gather required tensors for (auto &op : comm_op.ops) { op->get_inputs(&comm_op.global_tensors); op->get_outputs(&comm_op.global_tensors); op->get_outputs(&comm_op.output_tensors); } } // Create and communicate tensors to subworlds for (auto &comm_op : comm_ops) { for (auto &global_tensor : comm_op.global_tensors) { tCTF_Tensor* local_clone; if (comm_op.world != NULL) { local_clone = new tCTF_Tensor(*global_tensor, *comm_op.world); } else { local_clone = NULL; } comm_op.local_tensors.insert(local_clone); comm_op.remap[global_tensor] = local_clone; global_tensor->add_to_subworld(local_clone, 1, 0); } for (auto &output_tensor : comm_op.output_tensors) { assert(comm_op.remap.find(output_tensor) != comm_op.remap.end()); } } // Run my tasks if (comm_ops.size() > my_color) { for (auto &op : comm_ops[my_color].ops) { std::cout << rank << "Exec " << op->name() << std::endl; op->execute(&comm_ops[my_color].remap); } } // Communicate results back into global for (auto &comm_op : comm_ops) { for (auto &output_tensor : comm_op.output_tensors) { output_tensor->add_from_subworld(comm_op.remap[output_tensor], 1, 0); } } // Clean up local tensors & world if (comm_ops.size() > my_color) { for (auto &local_tensor : comm_ops[my_color].local_tensors) { delete local_tensor; } delete comm_ops[my_color].world; } // Update ready tasks for (auto &comm_op : comm_ops) { for (auto &op : comm_op.ops) { schedule_op_successors(op); } } } /* // The dead simple scheduler template void tCTF_Schedule::partition_and_execute() { while (ready_tasks.size() >= 1) { tCTF_TensorOperation* op = ready_tasks.front(); ready_tasks.pop_front(); op->execute(); schedule_op_successors(op); } } */ template void tCTF_Schedule::execute() { srand (time(NULL)); global_schedule = NULL; typename std::deque*>::iterator it; // Initialize all tasks & initial ready queue for (it = steps_original.begin(); it != steps_original.end(); it++) { (*it)->dependency_left = (*it)->dependency_count; } ready_tasks = root_tasks; // Preprocess dummy operations while (!ready_tasks.empty()) { if (ready_tasks.front()->is_dummy()) { schedule_op_successors(ready_tasks.front()); ready_tasks.pop_front(); } else { break; } } while (!ready_tasks.empty()) { int rank; MPI_Comm_rank(world->comm, &rank); if (rank == 0) { std::cout << "Partition" << std::endl; } partition_and_execute(); } } template void tCTF_Schedule::add_operation_typed(tCTF_TensorOperation* op) { steps_original.push_back(op); std::set*> op_lhs_set; op->get_outputs(&op_lhs_set); assert(op_lhs_set.size() == 1); // limited case to make this a bit easier tCTF_Tensor* op_lhs = *op_lhs_set.begin(); std::set*> op_deps; op->get_inputs(&op_deps); typename std::set*>::iterator deps_iter; for (deps_iter = op_deps.begin(); deps_iter != op_deps.end(); deps_iter++) { tCTF_Tensor* dep = *deps_iter; typename std::map*, tCTF_TensorOperation*>::iterator dep_loc = latest_write.find(dep); tCTF_TensorOperation* dep_op; if (dep_loc != latest_write.end()) { dep_op = dep_loc->second; } else { // create dummy operation to serve as a root dependency // TODO: this can be optimized away dep_op = new tCTF_TensorOperation(TENSOR_OP_NONE, NULL, NULL); latest_write[dep] = dep_op; root_tasks.push_back(dep_op); steps_original.push_back(dep_op); } dep_op->successors.push_back(op); dep_op->reads.push_back(op); op->dependency_count++; } typename std::map*, tCTF_TensorOperation*>::iterator prev_loc = latest_write.find(op_lhs); if (prev_loc != latest_write.end()) { // if there was a previous write, add its dependencies to my dependencies // to ensure that I don't clobber values that a ready dependency needs std::vector*>* prev_reads = &(prev_loc->second->reads); typename std::vector*>::iterator prev_iter; for (prev_iter = prev_reads->begin(); prev_iter != prev_reads->end(); prev_iter++) { if (*prev_iter != op) { (*prev_iter)->successors.push_back(op); op->dependency_count++; } } } latest_write[op_lhs] = op; } template void tCTF_Schedule::add_operation(tCTF_TensorOperationBase* op) { tCTF_TensorOperation* op_typed = dynamic_cast* >(op); assert(op_typed != NULL); add_operation_typed(op_typed); } template void tCTF_TensorOperation::execute(std::map*, tCTF_Tensor*>* remap) { assert(global_schedule == NULL); // ensure this isn't going into a record() tCTF_Idx_Tensor* remapped_lhs = lhs; const tCTF_Term* remapped_rhs = rhs; if (remap != NULL) { remapped_lhs = dynamic_cast* >(remapped_lhs->clone(remap)); assert(remapped_lhs != NULL); remapped_rhs = remapped_rhs->clone(remap); } switch (op) { case TENSOR_OP_NONE: break; case TENSOR_OP_SET: *remapped_lhs = *remapped_rhs; break; case TENSOR_OP_SUM: *remapped_lhs += *remapped_rhs; break; case TENSOR_OP_SUBTRACT: *remapped_lhs -= *remapped_rhs; break; case TENSOR_OP_MULTIPLY: *remapped_lhs *= *remapped_rhs; break; default: std::cerr << "tCTF_TensorOperation::execute(): unexpected op: " << op << std::endl; assert(false); } } template void tCTF_TensorOperation::get_outputs(std::set*>* outputs_set) const { assert(lhs->parent->name); outputs_set->insert(lhs->parent); } template void tCTF_TensorOperation::get_inputs(std::set*>* inputs_set) const { rhs->get_inputs(inputs_set); switch (op) { case TENSOR_OP_SET: break; case TENSOR_OP_SUM: case TENSOR_OP_SUBTRACT: case TENSOR_OP_MULTIPLY: if (lhs->parent) { inputs_set->insert(lhs->parent); } break; default: std::cerr << "tCTF_TensorOperation::get_inputs(): unexpected op: " << op << std::endl; assert(false); } } template class tCTF_Schedule; #ifdef CTF_COMPLEX template class tCTF_Schedule< std::complex >; #endif