|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +Created on Sat Aug 10 18:04:36 2019 |
| 4 | +
|
| 5 | +@author: Ruibzhan |
| 6 | +""" |
| 7 | + |
| 8 | + |
| 9 | +from mpi4py import MPI |
| 10 | +import paraHill |
| 11 | +import pickle |
| 12 | +import numpy as np |
| 13 | +from itertools import product |
| 14 | +import time |
| 15 | +import datetime |
| 16 | +import config |
| 17 | +from config import * |
| 18 | +#%% Comm set |
| 19 | +comm = MPI.COMM_WORLD |
| 20 | +my_rank = comm.Get_rank() |
| 21 | +n_processors = comm.Get_size() |
| 22 | +print("Processors found: ",n_processors) |
| 23 | +current_time = datetime.datetime.now() |
| 24 | +print("Time now at the beginning is: ", current_time) |
| 25 | + |
| 26 | +def scatter_list_to_processors(comm, data_list, n_processors): |
| 27 | + import math |
| 28 | + data_amount = len(data_list) |
| 29 | + heap_size = math.ceil(data_amount/(n_processors-1)) |
| 30 | + |
| 31 | + for pidx in range(1,n_processors): |
| 32 | + try: |
| 33 | + heap = data_list[heap_size*(pidx-1):heap_size*pidx] |
| 34 | + except: |
| 35 | + heap = data_list[heap_size*(pidx-1):] |
| 36 | + comm.send(heap,dest = pidx) |
| 37 | + |
| 38 | + return True |
| 39 | + |
| 40 | +def receive_from_processors_to_dict(comm, n_processors): |
| 41 | + # receives dicts, combine them and return |
| 42 | + feedback = dict() |
| 43 | + for pidx in range(1,n_processors): |
| 44 | + receved = comm.recv(source=pidx) |
| 45 | + feedback.update(receved) |
| 46 | + return feedback |
| 47 | + |
| 48 | +#start = time.time() |
| 49 | +#%% load data |
| 50 | +with open(args.init,'rb') as file: |
| 51 | + gene_names,dist_matr,init_map = pickle.load(file) |
| 52 | +# with open('./Synthetic/Init_Synth8000.pickle','rb') as file: |
| 53 | + # gene_names,dist_matr,init_map = pickle.load(file) |
| 54 | +Nn = len(gene_names) |
| 55 | + |
| 56 | +kk = args.num |
| 57 | + |
| 58 | +if init_map.shape[0] != init_map.shape[1]: |
| 59 | + raise ValueError("For now only square images are considered.") |
| 60 | +nn = init_map.shape[0] |
| 61 | +# Convert from 'F34' to int 34 |
| 62 | +init_map = np.char.strip(init_map.astype(str),'F').astype(int) |
| 63 | +map_in_int = init_map |
| 64 | +#%% |
| 65 | +corr_evol = [] |
| 66 | +if my_rank == 0: |
| 67 | + print("Initial corr: >>>",paraHill.universial_corr(dist_matr,map_in_int)) |
| 68 | + for n_iter in range(kk): |
| 69 | + # 9 initial coordinates. |
| 70 | + init_coords = [x for x in product([0,1,2],repeat = 2)] |
| 71 | + for init_coord in init_coords: |
| 72 | + # Update the mapping. |
| 73 | + broadcast_msg = map_in_int |
| 74 | + comm.bcast(broadcast_msg,root = 0) |
| 75 | + # generate the centroids |
| 76 | + xxx = [init_coord[0]+i*3 for i in range(int(nn/3)+1) if (init_coord[0]+i*3)<nn] |
| 77 | + yyy = [init_coord[1]+i*3 for i in range(int(nn/3)+1) if (init_coord[1]+i*3)<nn] |
| 78 | + centr_list = [x for x in product(xxx,yyy)] |
| 79 | + # Master send and recv |
| 80 | + scatter_list_to_processors(comm,centr_list,n_processors) |
| 81 | + swap_dict = receive_from_processors_to_dict(comm,n_processors) |
| 82 | + print(swap_dict) |
| 83 | + map_in_int = paraHill.execute_dict_swap(swap_dict, map_in_int) |
| 84 | + |
| 85 | + print(">",init_coord,"Corr:",paraHill.universial_corr(dist_matr,map_in_int)) |
| 86 | + |
| 87 | + print(">>>",n_iter,"Corr:",paraHill.universial_corr(dist_matr,map_in_int)) |
| 88 | + corr_evol.append(paraHill.universial_corr(dist_matr,map_in_int)) |
| 89 | + |
| 90 | + coords = np.array([[item[0] for item in np.where(map_in_int == ii)] for ii in range(Nn)]) |
| 91 | + with open(args.mapping,'wb') as file: |
| 92 | + pickle.dump([gene_names,coords,map_in_int],file) |
| 93 | + #print("Consumed time:",start - time.time()) |
| 94 | + Endtime = datetime.datetime.now() |
| 95 | + print("Time now at the end: ", Endtime) |
| 96 | + import pandas as pd |
| 97 | + pd.Series(corr_evol).to_csv(args.evolution) |
| 98 | +else: |
| 99 | + # other processors |
| 100 | + for n_iter in range(kk): |
| 101 | + broadcast_msg = init_map # just for a size |
| 102 | + |
| 103 | + # 9 initial Centroids |
| 104 | + for ii in range(9): |
| 105 | + #Update the mapping |
| 106 | + map_in_int = comm.bcast(broadcast_msg,root = 0) |
| 107 | + |
| 108 | + centr_list = comm.recv(source = 0) |
| 109 | + each_swap_dict = paraHill.evaluate_centroids_in_list(centr_list,dist_matr,map_in_int) |
| 110 | + comm.send(each_swap_dict,dest = 0) |
| 111 | + #result = dict() |
| 112 | + #for each in data: |
| 113 | + # result.update({each: -each}) |
| 114 | + #comm.send(result,dest = 0) |
| 115 | + |
| 116 | +MPI.Finalize |
0 commit comments