"Paralelização" da Iteração de Gauss-Seidel
Introdução
Há fundamentalmente duas maneiras de paralelizar uma iteração de GS com a ordenação Red-Black.
A maneira mais natural para programadores não científicos que estão acostumados a trabalhar com
threads seria paralelizar diretamente o laço de atualização, como indicado no trecho abaixo.
| for it in range(maxiter):
uold = p.u.copy()
# Criação de threads
# Atribuição do pontos da cor atualizada a cada thread
# Update em paralelo para cada subconjunto de nós em cada thread
for i in prob_range:
p.u[i] = 0.5*(p.u[i-1] + p.u[i+1] - p.dx2*p.f[i])
err[it] = np.linalg.norm(uold - p.u)/np.linalg.norm(p.u)
if err[it] <= tol:
converged = True
break
|
O problema com este tipo de técnica é que existe um custo de criação e gerenciamento de threads que
não é trivial, e que pode ser amortizado com uma organização do programa mais compatível com o
que normalmente é feito na paralelização dos códigos de engenharia, onde se usa a decomposição
do domínio.
O que faremos então é particionar o domínio no número de threads que desejamos executar, criar
as threads, associar um subdomínio a cada thread e realizar toda a iteração em paralelo,
sincronizando quando necessário.
Faremos isto com a forma vetorial da implementação, é claro, por motivos de desempenho.
Partição do domínio
Em um problema mais sério, com uma geometria complexa representada por uma malha não estruturada,
o particionamento do domínio computacional pode ser complexo, mas há rotinas protas para fazer
isto, como o Parmetis,
PT-Scotch e outros.
No nosso caso o particionamento do domínio é relativamente trivial, já que basta divir os elementos
de um vetor em grupos consecutivos mais ou menos do mesmo tamanho, para tentar garantir o
balanceamento de carga.
Podemos ter uma primeira implementação desta ideia com a função abaixo.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | def split_domain_simple(size, nparts, bal=True):
"""
Returns indices indicating the beginning and end of a partition
of a 1D vector.
If bal is true, spreads the unbalanced itens through some of the threads,
otherwise lumps all of the in the last group. Only important if the vector
is very small.
"""
size_part, rest = divmod(size, nparts) # Partition the blocks
if bal:
part_sizes = [size_part]*(nparts-rest)+[(size_part+1)]*rest
else:
part_sizes = [size_part]*(nparts-1)+[(size_part+rest)]
limits = [(0, part_sizes[0])]
for size in part_sizes[1:]:
_, last = limits[-1]
limits.append((last, last+size))
return limits
|
Esta função retorna uma lista de pares com o início e o final de cada faixa que
deve ser atualizada por cada thread.
É interessante notar que atualizar o sistema na ordem Red-Black não é estritamente
essencial para a paralelização. A única dependência importante é entre domínios adjacentes,
poderíamos organizar a atualização dentro de cada domínio da forma que fosse mais conveniente.
No caso, a forma mais conveniente é exatamente a iteração Red-Black pois permite que
a solução seja vetorizada muito facilmente.
É util então modificar a função acima para retornar uma divisão do domínio em pares de pontos,
e, já que estamos aqui, podemos fazer uma que divide o domínio em blocos de um determinado
tamanho dado. A função abaixo faz isto.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | def split_domain(size, nparts, blk_size=1, bal=True):
"""
Returns indices indicating the beginning and end of a partition
of a 1D vector.
"""
nblks, fix = divmod(size, blk_size) # Partition into blocks
size_part, rest = divmod(nblks, nparts) # Partition the blocks
if bal:
part_sizes = [size_part*blk_size]*(nparts-rest)+\
[(size_part+1)*blk_size]*rest
else:
part_sizes = [size_part*blk_size]*(nparts-1)+[(size_part+rest)*blk_size]
limits = [(0, part_sizes[0])]
for size in part_sizes[1:]:
_, last = limits[-1]
limits.append((last, last+size))
if fix:
a, b = limits[-1]
limits[-1]= (a, b+fix)
return limits
|
Temos uma outra peculiaridade, no entanto.
Como os nós das extremidades do domínio nunca são atualizados, eles não precisam ser coloridos, nem entra na numeração Red-Black. Então, desejamos que a partição inicie
com um nó vermelho na posição 1, e termine um nó antes do último, então temos a última
versão, abaixo. Não se preocupe demais em entender isto, nem eu mesmo entendo.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | """
"gstrheadsup.py"
Thread support functions
"""
def split_domain(size, nparts, blk_size=2, bal=True, bcs=False):
"""
Returns a list of pair of indices indicating the beginning and
end of a partition of a 1D vector.
size: number of points
nparts: number of partitions
blk_size: number of point on each color (2 for red_black)
bal: if True, at most one block of unbalance between partitions
bcs: if True, ignores the nodes at the extremities
"""
# Here be dragons.
if bcs:
size -= 2
nblks, fix = divmod(size, blk_size) # Partition into blocks
size_part, rest = divmod(nblks, nparts) # Partition the blocks
if bal:
part_sizes = [size_part*blk_size]*(nparts-rest)+\
[(size_part+1)*blk_size]*rest
else:
part_sizes = [size_part*blk_size]*(nparts-1)+\
[(size_part+rest)*blk_size]
first = 0 if not bcs else 1
limits = [(first, first+part_sizes[0])]
for size in part_sizes[1:]:
_, last = limits[-1]
limits.append((last, last+size))
if fix:
a, b = limits[-1]
limits[-1]= (a, b+fix)
return limits
|
Esta função é colocada em um módulo específico, gsthreadsup.py, para poder ser
empregada em vários contextos diferentes.
Nova Classe
Para facilitar a implementação, vamos definir uma nova classe para descrever um problema,
derivada da classe original, que inclui também o particionamento do domínio. Vamos mover
toda a definição do domínio computacional para um módulo específico, problem01.py, e mover
as coisas ligadas a particionamento e ordenação para este módulo.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | """
"problem01.py"
Computational domain definitions.
"""
import numpy as np
from gssup04 import zero_rhs
import gsthreadsup as gst
#%%
class RBSlice:
"""
Returns array slices for Red-Black update.
"""
def __init__(self, size):
Mr = size-2
Mb = size-3 if size%2 else size-2
self.R = slice(1,Mr+1,2)
self.Rm = slice(0,Mr,2)
self.Rp = slice(2,Mr+2,2)
self.B = slice(2,Mb+1,2)
self.Bm = slice(1,Mb,2)
self.Bp = slice(3,Mb+2,2)
#%%
class Problem:
def __init__(self, L, nnode, U0=None, UL=None, calc_rhs=zero_rhs, **kwargs):
"""
L: float -- Length of the domain
nnode: int -- Number of nodes
U0, UL: float -- Dirichlet boundary conditions
calc_rhs function(x, L, *args) -- computes the rhs of the equation
"""
self.x = np.linspace(0, L, nnode)
self.dx = self.x[1] - self.x[0]
self.dx2 = self.dx*self.dx
self.u = np.zeros_like(self.x)
self.L = L
self.nn = nnode
if U0 is not None: self.u[0] = U0
if UL is not None: self.u[-1] = UL
self.f = calc_rhs(self.x, self.L, **kwargs)
self.hist = None
class PartitionedProblem(Problem):
"""
A problem, but partitioned.
"""
def __init__(self, L, nnode, U0=None, UL=None, calc_rhs=zero_rhs, npart=1,
**kwargs):
super().__init__(L, nnode, U0, UL, calc_rhs, **kwargs)
self.slcs = []
self.sp = []
for first, last in gst.split_domain(self.x.size, npart, bcs=True):
self.slcs.append(slice(first-1, last+1, 1))
self.sp.append(RBSlice(last-first+2))
def parts(self):
"""
Returns the partitions of the domain, a triple (up, fp, sp).
The views have overlapping extremes to work as ghost cells.
up: list of views of the unknown for each partition
fp: list of views of the rhs
sp: precomputed Red-Black slices for each partition
"""
up = [self.u[s] for s in self.slcs]
fp = [self.f[s] for s in self.slcs]
return (up, fp, self.sp)
|
O arquivo de apoio também tem que ser moderadamente reorganizado.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 | """
gssup05.py
This just organizes a few support functions for the GS iteration.
"""
import logging
import numpy as np
from math import pi
from itertools import chain
__format = "%(asctime)s: %(message)s"
logging.basicConfig(format=__format, level=logging.INFO, datefmt="%H:%M:%S")
#%%
def zero_rhs(x, L):
"""Sets up a null vector for the RHS of the equation"""
return np.zeros_like(x)
def sin_rhs(x, L, A):
"""Sets up a rhs that produces a single period of a senoidal function with
amplitude A as the solution."""
f = 2*pi/L
return -f*f*A*np.sin(f*x)
def parab_rhs(x, L, A):
"""
Untested rhs for parabolic solution.
"""
return np.full_like(x, 2*A/(L*L))
def cub_rhs(x, L, A):
"""Sets up rhs to have a cubic solution with zeros on 0, L/2, L/3
and value A on L.
Don't forget to set up the correct boundary conditions."""
return -9*(A*L - 3*A*x)/(L*L*L)
#%%
class RBRange:
"""
Returns a range **with step 1** in the red-black ordering,
skipping over 0 and n-1, pretty much just useful for GS
iterations.
"""
def __init__(self, size):
self._s = size
self.reset()
def __iter__(self):
return self
def reset(self):
r = range(self._s)
self.c = chain(r[1:-1:2], r[2:-1:2])
def __next__(self):
try:
return next(self.c)
except StopIteration:
self.reset()
raise StopIteration
def info(*args, **kwargs):
"""Directly logs info calls, 99% of what we're doing."""
logging.info(*args, **kwargs)
|
Iteração por Partições
Como o primeiro passo para a paralelização, vamos dividir o domínio computacional
em partições, ou blocos, e iterar em cada bloco, sequencialmente.
É interessante observar que algumas mudanças estruturais são necessárias, como, por
exemplo, não podemos calcular a norma do “erro” em uma única operação vetorial com
np.linalg.norm(), já que a raiz quadrada não é uma operação linear.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 | """
Sequential RB partitioned iteration.
It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""
import numpy as np
import gssup05 as gs
import problem01 as pm
import matplotlib.pyplot as plt
gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
npart = 12
probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.zero_rhs, npart=npart),
'sinusoidal': pm.PartitionedProblem(L, nn,
calc_rhs=gs.sin_rhs, A=A, npart=npart),
'cubic': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.cub_rhs, A=A, npart=npart)}
#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
gs.info("Solving {} problem.".format(name))
err = np.zeros(maxiter, dtype=np.float64)
converged = False
for it in range(maxiter):
err_num = 0.0
err_den = 0.0
up, fp, sp = p.parts()
for u, f, s in zip(up, fp, sp):
uold = u.copy()
u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B])
err_num += np.sum((uold - u)**2)
err_den += np.sum(u**2)
err[it] = np.sqrt(err_num/err_den)
if err[it] <= tol:
converged = True
gs.info('Problem {} converged after {} iterations'.\
format(name, it))
break
if not converged:
gs.info('Problem {} failed to converge after {} iterations'.\
format(name, it))
err = np.resize(err, it+1)
p.hist = err
figs = { 'linear': (0,3),
'sinusoidal': (1,4),
'cubic': (2,5)}
styles = {'rb': 'r+',
'rbv': 'b'}
for name in probs_rbv.keys():
plt.figure(figs[name][0])
plt.title("Solution for problem {}".format(name))
plt.figure(figs[name][1])
plt.yscale('log')
plt.title("{} problem convergence history".format(name))
# Plot graphs correctly
for name, p in probs_rbv.items():
plt.figure(figs[name][0])
plt.plot(p.x, p.u, styles['rbv'])
plt.figure(figs[name][1])
plt.plot(p.hist, styles['rbv'])
|
É interessante observar que a iteração por blocos mostrada acima não é
exatamente idêntica àquela mostrada anteriormente.
Podemos verifica isto executando o programa acima para, por exemplo,
nn = 101, tol = 5e-5 e npart = 12, obtemos o seguinte relatório
ao final da execução.
| 18:17:53: Setting up problems.
18:17:53: Running partitioned RB iteration.
18:17:53: Solving linear problem.
Reloaded modules: gssup05, problem01, gssup04, gsthreadsup
18:17:54: Problem linear converged after 2721 iterations
18:17:54: Solving sinusoidal problem.
18:17:54: Problem sinusoidal converged after 1082 iterations
18:17:54: Solving cubic problem.
18:17:54: Problem cubic converged after 1060 iterations
|
Executando a iteração vetorial direta, não particionada, mostrada anteriormente, obtemos
o seguinte:
| 18:19:25: Running vector iteration.
18:19:25: Solving linear problem.
18:19:25: Problem linear converged after 2800 iterations
18:19:25: Solving sinusoidal problem.
18:19:25: Problem sinusoidal converged after 1110 iterations
18:19:25: Solving cubic problem.
18:19:25: Problem cubic converged after 1070 iterations
|
É curioso ver que a iteração particionada converge um pouco mais rapidamente
do que a direta para o caso linear. Provavelmente isto se deve ao fato de que
a ordenação que acontece com a iteração particionada tende a trazer o efeito
da condição de contorno um pouco mais rapidamente do que a iteração direta.
Este diagnóstico ainda é completamente especulativo, no entanto.
Fatorando a Atualização
O próximo passo para a paralelização é encapsular a atualização em uma função que possa
ser executada com facilidade por um executor.
O programa abaixo tem essa modificação.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 | """
Sequential RB partitioned iteration.
It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""
import numpy as np
import gssup05 as gs
import problem01 as pm
import matplotlib.pyplot as plt
gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
npart = 12
probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.zero_rhs, npart=npart),
'sinusoidal': pm.PartitionedProblem(L, nn,
calc_rhs=gs.sin_rhs, A=A, npart=npart),
'cubic': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.cub_rhs, A=A, npart=npart)}
#%%
def par_update(u, f, s):
uold = u.copy()
u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B])
err_num = np.sum((uold - u)**2)
err_den = np.sum(u**2)
return err_num, err_den
#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
gs.info("Solving {} problem.".format(name))
err = np.zeros(maxiter, dtype=np.float64)
converged = False
for it in range(maxiter):
err_num = 0.0
err_den = 0.0
up, fp, sp = p.parts()
for u, f, s in zip(up, fp, sp):
en, ed = par_update(u, f, s)
err_num += en
err_den += ed
err[it] = np.sqrt(err_num/err_den)
if err[it] <= tol:
converged = True
gs.info('Problem {} converged after {} iterations'.\
format(name, it))
break
if not converged:
gs.info('Problem {} failed to converge after {} iterations'.\
format(name, it))
err = np.resize(err, it+1)
p.hist = err
figs = { 'linear': (0,3),
'sinusoidal': (1,4),
'cubic': (2,5)}
styles = {'rb': 'r+',
'rbv': 'b'}
for name in probs_rbv.keys():
plt.figure(figs[name][0])
plt.title("Solution for problem {}".format(name))
plt.figure(figs[name][1])
plt.yscale('log')
plt.title("{} problem convergence history".format(name))
# Plot graphs correctly
for name, p in probs_rbv.items():
plt.figure(figs[name][0])
plt.plot(p.x, p.u, styles['rbv'])
plt.figure(figs[name][1])
plt.plot(p.hist, styles['rbv'])
|
Implementação “Paralela”
Agora basta apenas executar a função de atualização em paralelo. Temos que garantir
que as atualizações de cada cor não interfiram com a outra cor. Por isto dividimos
o domínio sempre terminando em uma cor preta e começando com uma cor vermelha,
e vamos introduzir uma chamada de sincronização dentro da função de atualização
(barrier.wait()), para garantir que a atualização da cor preta só comece quando a vermelha
acabar.
Uma implementação melhor faria a sincronização entre os vizinhos apenas, mas como o número
de threads é muito pequeno, isto não é realmente importante.
Não é necessário sincronizar a atualização da cor preta, já que está implícita pelo fim
dos threads.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 | """
Sequential RB partitioned iteration.
It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""
import numpy as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier
import matplotlib.pyplot as plt
gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
nthread = 4
probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.zero_rhs, npart=nthread),
'sinusoidal': pm.PartitionedProblem(L, nn,
calc_rhs=gs.sin_rhs, A=A, npart=nthread),
'cubic': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.cub_rhs, A=A, npart=nthread)}
#%%
class Updater():
def __init__(self, nthread):
self.barrier = Barrier(nthread)
def par_update(self, u, f, s):
pass
uold = u.copy()
u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
self.barrier.wait()
u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B])
err_num = np.sum((uold - u)**2)
err_den = np.sum(u**2)
return err_num, err_den
#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
updater = Updater(nthread)
for name, p in probs_rbv.items():
gs.info("Solving {} problem.".format(name))
err = np.zeros(maxiter, dtype=np.float64)
converged = False
for it in range(maxiter):
err_num = 0.0
err_den = 0.0
up, fp, sp = p.parts()
with ThreadPoolExecutor() as executor:
res = executor.map(updater.par_update, up, fp, sp)
for en, ed in res:
err_num += en
err_den += ed
err[it] = np.sqrt(err_num/err_den)
if err[it] <= tol:
converged = True
gs.info('Problem {} converged after {} iterations'.\
format(name, it))
break
if not converged:
gs.info('Problem {} failed to converge after {} iterations'.\
format(name, it))
err = np.resize(err, it+1)
p.hist = err
figs = { 'linear': (0,3),
'sinusoidal': (1,4),
'cubic': (2,5)}
styles = {'rb': 'r+',
'rbv': 'b'}
for name in probs_rbv.keys():
plt.figure(figs[name][0])
plt.title("Solution for problem {}".format(name))
plt.figure(figs[name][1])
plt.yscale('log')
plt.title("{} problem convergence history".format(name))
# Plot graphs correctly
for name, p in probs_rbv.items():
plt.figure(figs[name][0])
plt.plot(p.x, p.u, styles['rbv'])
plt.figure(figs[name][1])
plt.plot(p.hist, styles['rbv'])
|
Paralelização por subdomníos
A implementação acima de certa forma não forma do problema que descrevemos no começo
desta seção, a criação de um novo thread para cada partição em cada iteração.
Na maioria dos sistemas operacionais e linguagens de programação, a criação de threads é muito
rápida, particularmente quando comparada ao custo de atualização de cada partição, e isto não
seria um problema real.
É interessante, no entanto, em preparação para a implementação paralela com processos que não
compartilham memória, desenvolver uma implementação na qual é criado um único threado para cada
subdomínio, que realiza todas as iterações e sincroniza quando necessário com os vizinhos.
Isto torna o programa um pouco mais complicado, pois agora é necessário garantir o acesso exclusivo
a variáveis compartilhadas e computar resultados com informações que estão sendo computadas por
threads distintos.
Uma implementação desta técnica está mostrada abaixo. Há várias coisas interessantes nesta
implementação:
- Toda a lógica de atualização foi movida para uma classe, onde as variáveis compartilhadas
por todos os threads são membros do objeto, e as variáveis replicadas em cada thread são
as variáveis locais da função membro
par_update().
- O cálculo da norma e a verificação da convergência são feitos por um único thread, os
outros esperam e apenas leem o resultado deste teste.
- Cada thread calcula a parcela correspondente da norma do erro e a guarda em uma posição
própria de um vetor compartilhado por todos os threads, então não há necessidade de locks.
- Ou todos os threads convergem, ou ninguém converge, todos os threads retornam um valor apenas
por simetria, isto não é estritamente necessário.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 | import numpy as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier
import matplotlib.pyplot as plt
gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 22
maxiter = 10000
tol = 5e-5
nthread = 4
probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.zero_rhs, npart=nthread),
'sinusoidal': pm.PartitionedProblem(L, nn,
calc_rhs=gs.sin_rhs, A=A, npart=nthread),
'cubic': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.cub_rhs, A=A, npart=nthread)}
#%%
class Updater():
def __init__(self, nthread, tol):
"""
Performs parallel iterations on all subdomains.
nthread: number of threads to launch
tol: iterative update tolerance
"""
# These are all shared among all threads
self.tol = tol
self.barrier = Barrier(nthread)
self.err = np.zeros(maxiter, dtype=np.float64)
self.err_num = 0.0
self.err_den = 0.0
self.err_num_t = np.zeros(nthread)
self.err_den_t = np.zeros(nthread)
def par_update(self, u, f, s, thread_idx):
"""
Run the iterarions for each subdomain in parallel
u: view of the partition for the unknown
f: view of the partitions for the rhs
s: RB orderings for this partition
thread_idx: this thread id
"""
converged = False
for it in range(maxiter):
uold = u.copy()
u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
self.barrier.wait()
u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B])
self.err_num_t[thread_idx] = np.sum((uold - u)**2)
self.err_den_t[thread_idx] = np.sum(u**2)
self.barrier.wait()
if thread_idx == 0:
en = np.sum(self.err_num_t)
ed = np.sum(self.err_den_t)
self.err[it] = np.sqrt(en/ed)
self.barrier.wait() # This one was hard af.
if self.err[it] <= self.tol:
converged = True
break
return converged, it+1
##%% Does a single thread for each partition from beginning to the end
updater = Updater(nthread, tol)
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
gs.info("Solving {} problem.".format(name))
up, fp, sp = p.parts()
with ThreadPoolExecutor() as executor:
res = executor.map(updater.par_update, up, fp, sp, range(nthread))
for conv, nit in res:
if not conv:
gs.info('Problem {} failed to converge after {} iterations'.\
format(name, nit))
gs.info('Problem {} converged after {} iterations'.\
format(name, nit))
# either everyones did or no one did
p.hist = updater.err[:nit].copy()
break
figs = { 'linear': (0,3),
'sinusoidal': (1,4),
'cubic': (2,5)}
styles = {'rb': 'r+',
'rbv': 'b'}
for name in probs_rbv.keys():
plt.figure(figs[name][0])
plt.title("Solution for problem {}".format(name))
plt.figure(figs[name][1])
plt.yscale('log')
plt.title("{} problem convergence history".format(name))
# Plot graphs correctly
for name, p in probs_rbv.items():
plt.figure(figs[name][0])
plt.plot(p.x, p.u, styles['rbv'])
plt.figure(figs[name][1])
plt.plot(p.hist, styles['rbv'])
|
O cálculo da norma do erro no programa está feito de uma forma muito mais característica de um
programa que segue o modelo de memória distribuída do que um implementado segundo o
modelo de memória compartilhada. Fizemos isto propositalmente, por compatibilidade, mas
é interessante ver como seria a implementação mais clássica de um modelo de memória compartilhada.
A principal diferença é que os resultados das acumulações mostradas nas linhas 66 e 67 acima
seriam guardados diretamente em variável compartilhadas por todos os threads, protegidas
por um lock para evitar conflitos de acesso. A implementação a seguir tem esta modificação, mas
a implementação anterior continua nossa preferida.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 | import numpy as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier, Lock
import matplotlib.pyplot as plt
gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 22
maxiter = 10000
tol = 5e-5
nthread = 4
probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.zero_rhs, npart=nthread),
'sinusoidal': pm.PartitionedProblem(L, nn,
calc_rhs=gs.sin_rhs, A=A, npart=nthread),
'cubic': pm.PartitionedProblem(L, nn, UL=A,
calc_rhs=gs.cub_rhs, A=A, npart=nthread)}
#%%
class Updater():
def __init__(self, nthread, tol):
"""
Performs parallel iterations on all subdomains.
nthread: number of threads to launch
tol: iterative update tolerance
"""
# These are all shared among all threads
self.tol = tol
self.barrier = Barrier(nthread)
self.lock = Lock()
self.err = np.zeros(maxiter, dtype=np.float64)
self.err_num = 0.0
self.err_den = 0.0
def par_update(self, u, f, s, thread_idx):
"""
Run the iterarions for each subdomain in parallel
u: view of the partition for the unknown
f: view of the partitions for the rhs
s: RB orderings for this partition
thread_idx: this thread id
"""
converged = False
for it in range(maxiter):
uold = u.copy()
u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
self.barrier.wait()
u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B])
err_num_t = np.sum((uold - u)**2)
err_den_t = np.sum(u**2)
with self.lock:
self.err_num += err_num_t
self.err_den += err_den_t
self.barrier.wait()
if thread_idx == 0:
self.err[it] = np.sqrt(self.err_num/self.err_den)
self.err_num = 0.0
self.err_den = 0.0
self.barrier.wait() # This one was hard af.
if self.err[it] <= self.tol:
converged = True
break
return converged, it+1
##%% Does a single thread for each partition from beginning to the end
updater = Updater(nthread, tol)
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
gs.info("Solving {} problem.".format(name))
up, fp, sp = p.parts()
with ThreadPoolExecutor() as executor:
res = executor.map(updater.par_update, up, fp, sp, range(nthread))
for conv, nit in res:
if not conv:
gs.info('Problem {} failed to converge after {} iterations'.\
format(name, nit))
gs.info('Problem {} converged after {} iterations'.\
format(name, nit))
# either everyones did or no one did
p.hist = updater.err[:nit].copy()
break
figs = { 'linear': (0,3),
'sinusoidal': (1,4),
'cubic': (2,5)}
styles = {'rb': 'r+',
'rbv': 'b'}
for name in probs_rbv.keys():
plt.figure(figs[name][0])
plt.title("Solution for problem {}".format(name))
plt.figure(figs[name][1])
plt.yscale('log')
plt.title("{} problem convergence history".format(name))
# Plot graphs correctly
for name, p in probs_rbv.items():
plt.figure(figs[name][0])
plt.plot(p.x, p.u, styles['rbv'])
plt.figure(figs[name][1])
plt.plot(p.hist, styles['rbv'])
|