# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import copyreg
import multiprocessing
import time
import types
from multiprocessing import Pool
import numpy as np
def _pickle_method(method):
"""function for multiprocessing in class
References:
method (class): Method
"""
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
if func_name.startswith(
'__') and not func_name.endswith('__'): # deal with mangled names
cls_name = cls.__name__.lstrip('_')
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
"""
function for multiprocessing in class
"""
for cls in cls.__mro__:
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
# Funcion inversa a la anterior
[docs]
def separate_from_iterable(iterable, shape=0):
"""This function does somehow the opposite of the previous one, it takes an iterable made of lists and separates each one in a different variable, reshaped with the desired shape
"""
# Averiguar el numero de variables diferentes que habra
N_var = len(iterable[0])
# Make iterable array
iterable = np.array(iterable)
# Cambiar la forma de las variables
variables = range(N_var)
for indV in range(N_var):
if shape == 0:
variables[indV] = iterable[:, indV]
else:
variables[indV] = np.reshape(iterable[:, indV], shape)
# Fin
return variables
[docs]
class auxiliar_multiprocessing(object):
def __init__(self):
pass
# Method that executes the multiprocessing
[docs]
def execute_multiprocessing(self,
function,
var_iterable,
dict_constants=dict(),
Ncores=8):
# Store data in object
self.external_function = function
self.dict_constants = dict_constants
# Start multiprocessing if more than one core is required
if Ncores > 1:
pool = Pool(Ncores)
print('Starting multiprocessing')
result = pool.map(self.method_single_proc, var_iterable)
print('Multiprocessing finished')
pool.close()
pool.join()
# When only one core is asked, don't go to multiprocessing
else:
N = len(var_iterable)
result = range(N)
print('Starting process in only 1 core')
for ind, elem in enumerate(var_iterable):
result[ind] = function(elem, dict_constants)
# Save and extract resultado
self.resultado = result
return result
[docs]
def method_single_proc(self, elem_iterable):
# Method that is called in each iteration of the multiprocessing
return self.external_function(elem_iterable, self.dict_constants)
[docs]
def execute_multiprocessing(__function_process__,
dict_Parameters,
num_processors,
verbose=False):
"""Executes multiprocessing reading a dictionary.
Parameters:
__function_process__ function tu process, it only accepts a dictionary
dict_Parameters, dictionary / array with Parameters:
num_processors, if 1 no multiprocessing is used
verbose, prints processing time
Returns:
data: reults of multiprocessing
processing time
Examples:
def __function_process__(xd):
x = xd['x']
y = xd['y']
# grt = copy.deepcopy(grating)
suma = x + y
return dict(sumas=suma, ij=xd['ij'])
def creation_dictionary_multiprocessing():
# create Parameters: for multiprocessing
t1 = time.time()
X = np.linspace(1, 2, 10)
Y = np.linspace(1, 2, 1000)
dict_Parameters = []
ij = 0
for i, x in enumerate(X):
for j, y in enumerate(Y):
dict_Parameters.append(dict(x=x, y=y, ij=[ij]))
ij += 1
t2 = time.time()
print("time creation dictionary = {}".format(t2 - t1))
return dict_Parameters
"""
t1 = time.time()
if num_processors == 1 or len(dict_Parameters) < 2:
data_pool = [__function_process__(xd) for xd in dict_Parameters]
else:
pool = multiprocessing.Pool(processes=num_processors)
data_pool = pool.map(__function_process__, dict_Parameters)
pool.close()
pool.join()
t2 = time.time()
if verbose is True:
print("num_proc: {}, time={}".format(num_processors, t2 - t1))
return data_pool, t2 - t1