/ramiro/demec/ufpe

"Paralelização" da Iteração de Gauss-Seidel

Introdução

Há fundamentalmente duas maneiras de paralelizar1 uma iteração de GS com a ordenação Red-Black.

A maneira mais natural para programadores não científicos que estão acostumados a trabalhar com threads seria paralelizar diretamente o laço de atualização, como indicado no trecho abaixo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
for it in range(maxiter):
    uold = p.u.copy()
    # Criação de threads
    # Atribuição do pontos da cor atualizada a cada thread
    # Update em paralelo para cada subconjunto de nós em cada thread
    for i in prob_range:
        p.u[i] = 0.5*(p.u[i-1] + p.u[i+1] - p.dx2*p.f[i])  
    err[it] = np.linalg.norm(uold - p.u)/np.linalg.norm(p.u)
    if err[it] <= tol:
        converged = True
        break

O problema com este tipo de técnica é que existe um custo de criação e gerenciamento de threads que não é trivial, e que pode ser amortizado com uma organização do programa mais compatível com o que normalmente é feito na paralelização dos códigos de engenharia, onde se usa a decomposição do domínio.

O que faremos então é particionar o domínio no número de threads que desejamos executar, criar as threads, associar um subdomínio a cada thread e realizar toda a iteração em paralelo, sincronizando quando necessário.

Faremos isto com a forma vetorial da implementação, é claro, por motivos de desempenho.

Partição do domínio

Em um problema mais sério, com uma geometria complexa representada por uma malha não estruturada, o particionamento do domínio computacional pode ser complexo, mas há rotinas protas para fazer isto, como o Parmetis, PT-Scotch e outros.

No nosso caso o particionamento do domínio é relativamente trivial, já que basta divir os elementos de um vetor em grupos consecutivos mais ou menos do mesmo tamanho, para tentar garantir o balanceamento de carga.

Podemos ter uma primeira implementação desta ideia com a função abaixo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def split_domain_simple(size, nparts, bal=True):
    """
    Returns indices indicating the beginning and end of a partition
    of a 1D vector.

    If bal is true, spreads the unbalanced itens through some of the threads,
    otherwise lumps all of the in the last group. Only important if the vector
    is very small.
    """
    size_part, rest = divmod(size, nparts)          # Partition the blocks
    if bal:
        part_sizes = [size_part]*(nparts-rest)+[(size_part+1)]*rest
    else:
        part_sizes = [size_part]*(nparts-1)+[(size_part+rest)]

    limits = [(0, part_sizes[0])]
    for size in part_sizes[1:]:
        _, last = limits[-1]
        limits.append((last, last+size))
    return limits

Esta função retorna uma lista de pares com o início e o final de cada faixa que deve ser atualizada por cada thread.

É interessante notar que atualizar o sistema na ordem Red-Black não é estritamente essencial para a paralelização. A única dependência importante é entre domínios adjacentes, poderíamos organizar a atualização dentro de cada domínio da forma que fosse mais conveniente.

No caso, a forma mais conveniente é exatamente a iteração Red-Black pois permite que a solução seja vetorizada muito facilmente.

É util então modificar a função acima para retornar uma divisão do domínio em pares de pontos, e, já que estamos aqui, podemos fazer uma que divide o domínio em blocos de um determinado tamanho dado. A função abaixo faz isto.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def split_domain(size, nparts, blk_size=1, bal=True):
    """
    Returns indices indicating the beginning and end of a partition
    of a 1D vector.
    """
    nblks, fix = divmod(size, blk_size)               # Partition into blocks
    size_part, rest = divmod(nblks, nparts)          # Partition the blocks
    if bal:
        part_sizes = [size_part*blk_size]*(nparts-rest)+\
                                            [(size_part+1)*blk_size]*rest
    else:
        part_sizes = [size_part*blk_size]*(nparts-1)+[(size_part+rest)*blk_size]

    limits = [(0, part_sizes[0])]
    for size in part_sizes[1:]:
        _, last = limits[-1]
        limits.append((last, last+size))
    if fix:
        a, b = limits[-1]
        limits[-1]= (a, b+fix)
    return limits

Temos uma outra peculiaridade, no entanto. Como os nós das extremidades do domínio nunca são atualizados, eles não precisam ser coloridos, nem entra na numeração Red-Black. Então, desejamos que a partição inicie com um nó vermelho na posição 1, e termine um nó antes do último, então temos a última versão, abaixo. Não se preocupe demais em entender isto, nem eu mesmo entendo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
"""
"gstrheadsup.py"

Thread support functions
"""

def split_domain(size, nparts, blk_size=2, bal=True, bcs=False):
    """
    Returns a list of pair of indices indicating the beginning and
    end of a partition of a 1D vector.

    size: number of points
    nparts: number of partitions
    blk_size: number of point on each color (2 for red_black)
    bal: if True, at most one block of unbalance between partitions
    bcs: if True, ignores the nodes at the extremities
    """

    # Here be dragons.
    if bcs:
        size -= 2
    nblks, fix = divmod(size, blk_size)              # Partition into blocks
    size_part, rest = divmod(nblks, nparts)          # Partition the blocks
    if bal:
        part_sizes = [size_part*blk_size]*(nparts-rest)+\
                                            [(size_part+1)*blk_size]*rest
    else:
        part_sizes = [size_part*blk_size]*(nparts-1)+\
                                                    [(size_part+rest)*blk_size]
    first = 0 if not bcs else 1
    limits = [(first, first+part_sizes[0])]
    for size in part_sizes[1:]:
        _, last = limits[-1]
        limits.append((last, last+size))
    if fix:
        a, b = limits[-1]
        limits[-1]= (a, b+fix)
    return limits

Esta função é colocada em um módulo específico, gsthreadsup.py, para poder ser empregada em vários contextos diferentes.

Nova Classe

Para facilitar a implementação, vamos definir uma nova classe para descrever um problema, derivada da classe original, que inclui também o particionamento do domínio. Vamos mover toda a definição do domínio computacional para um módulo específico, problem01.py, e mover as coisas ligadas a particionamento e ordenação para este módulo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""

"problem01.py"

Computational domain definitions.
"""
import numpy as np
from gssup04 import zero_rhs
import gsthreadsup as gst

#%%
class RBSlice:
    """
    Returns array slices for Red-Black update.
    """
    def __init__(self, size):
        Mr = size-2
        Mb = size-3 if size%2 else size-2
        self.R = slice(1,Mr+1,2)
        self.Rm = slice(0,Mr,2)
        self.Rp = slice(2,Mr+2,2)
        self.B = slice(2,Mb+1,2)
        self.Bm = slice(1,Mb,2)
        self.Bp = slice(3,Mb+2,2)

#%%
class Problem:
    def __init__(self, L, nnode, U0=None, UL=None, calc_rhs=zero_rhs, **kwargs):
        """
        L: float      -- Length of the domain
        nnode: int    -- Number of nodes
        U0, UL: float -- Dirichlet boundary conditions
        calc_rhs function(x, L, *args) -- computes the rhs of the equation
        """
        self.x = np.linspace(0, L, nnode)
        self.dx = self.x[1] - self.x[0]
        self.dx2 = self.dx*self.dx
        self.u = np.zeros_like(self.x)
        self.L = L
        self.nn = nnode
        if U0 is not None: self.u[0] = U0 
        if UL is not None: self.u[-1] = UL 
        self.f = calc_rhs(self.x, self.L, **kwargs)
        self.hist = None

class PartitionedProblem(Problem):
    """
    A problem, but partitioned.
    """
    def __init__(self, L, nnode, U0=None, UL=None, calc_rhs=zero_rhs, npart=1,
                 **kwargs):
        super().__init__(L, nnode, U0, UL, calc_rhs, **kwargs)
        self.slcs = []
        self.sp = []
        for first, last in gst.split_domain(self.x.size, npart, bcs=True):
            self.slcs.append(slice(first-1, last+1, 1))
            self.sp.append(RBSlice(last-first+2))

    def parts(self):
        """
         Returns the partitions of the domain, a triple (up, fp, sp).
        The views have overlapping extremes to work as ghost cells.

        up: list of views of the unknown for each partition
        fp: list of views of the rhs
        sp: precomputed Red-Black slices for each partition
        """   
        up = [self.u[s] for s in self.slcs]
        fp = [self.f[s] for s in self.slcs]
        return (up, fp, self.sp)

O arquivo de apoio também tem que ser moderadamente reorganizado.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
gssup05.py

This just organizes a few support functions for the GS iteration.
"""

import logging
import numpy       as     np
from   math        import pi
from   itertools   import chain

__format = "%(asctime)s: %(message)s"
logging.basicConfig(format=__format, level=logging.INFO, datefmt="%H:%M:%S")

#%%
def zero_rhs(x, L):
    """Sets up a null vector for the RHS of the equation"""
    return np.zeros_like(x)

def sin_rhs(x, L, A):
    """Sets up a rhs that produces a single period of a senoidal function with
       amplitude A as the solution."""
    f = 2*pi/L
    return -f*f*A*np.sin(f*x)

def parab_rhs(x, L, A):
    """
    Untested rhs for parabolic solution.
    """
    return np.full_like(x, 2*A/(L*L))

def cub_rhs(x, L, A):
    """Sets up rhs to have a cubic solution with zeros on 0, L/2, L/3
       and value A on L.
       Don't forget to set up the correct boundary conditions."""
    return -9*(A*L - 3*A*x)/(L*L*L)

#%%
class RBRange:
    """
    Returns a range **with step 1** in the red-black ordering,
    skipping over 0 and n-1, pretty much just useful for GS
    iterations.
    """
    def __init__(self, size):
        self._s = size
        self.reset()

    def __iter__(self):
        return self

    def reset(self):
        r = range(self._s)
        self.c = chain(r[1:-1:2], r[2:-1:2])

    def __next__(self):
        try:
           return next(self.c)
        except StopIteration:
            self.reset()
            raise StopIteration

def info(*args, **kwargs):
    """Directly logs info calls, 99% of what we're doing."""
    logging.info(*args, **kwargs)

Iteração por Partições

Como o primeiro passo para a paralelização, vamos dividir o domínio computacional em partições, ou blocos, e iterar em cada bloco, sequencialmente.

É interessante observar que algumas mudanças estruturais são necessárias, como, por exemplo, não podemos calcular a norma do “erro” em uma única operação vetorial com np.linalg.norm(), já que a raiz quadrada não é uma operação linear.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
Sequential RB partitioned iteration.

It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""

import numpy  as np
import gssup05 as gs
import problem01 as pm
import matplotlib.pyplot as plt

gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
npart = 12

probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
                                            calc_rhs=gs.zero_rhs, npart=npart),
          'sinusoidal': pm.PartitionedProblem(L, nn,
                                          calc_rhs=gs.sin_rhs, A=A, npart=npart),
               'cubic': pm.PartitionedProblem(L, nn, UL=A,
                                         calc_rhs=gs.cub_rhs, A=A, npart=npart)}

#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
    gs.info("Solving {} problem.".format(name))
    err = np.zeros(maxiter, dtype=np.float64)
    converged = False
    for it in range(maxiter):
        err_num = 0.0
        err_den = 0.0
        up, fp, sp = p.parts()
        for u, f, s in zip(up, fp, sp):
            uold = u.copy()
            u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
            u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B]) 
            err_num  += np.sum((uold - u)**2)
            err_den  += np.sum(u**2)
        err[it] = np.sqrt(err_num/err_den)
        if err[it] <= tol:
            converged = True
            gs.info('Problem {} converged after {} iterations'.\
                                                          format(name, it))
            break
    if not converged:
        gs.info('Problem {} failed to converge after {} iterations'.\
                                                          format(name, it))
    err = np.resize(err, it+1)
    p.hist = err

figs = { 'linear': (0,3),
     'sinusoidal': (1,4),
          'cubic': (2,5)}

styles = {'rb': 'r+',
         'rbv': 'b'}

for name in probs_rbv.keys():
    plt.figure(figs[name][0])
    plt.title("Solution for problem {}".format(name))
    plt.figure(figs[name][1])
    plt.yscale('log')
    plt.title("{} problem convergence history".format(name))

# Plot graphs correctly
for name, p in probs_rbv.items():
    plt.figure(figs[name][0])
    plt.plot(p.x, p.u, styles['rbv'])     
    plt.figure(figs[name][1])
    plt.plot(p.hist, styles['rbv'])    

É interessante observar que a iteração por blocos mostrada acima não é exatamente idêntica àquela mostrada anteriormente.

Podemos verifica isto executando o programa acima para, por exemplo, nn = 101, tol = 5e-5 e npart = 12, obtemos o seguinte relatório ao final da execução.

1
2
3
4
5
6
7
8
9
18:17:53: Setting up problems.
18:17:53: Running partitioned RB iteration.
18:17:53: Solving linear problem.
Reloaded modules: gssup05, problem01, gssup04, gsthreadsup
18:17:54: Problem linear converged after 2721 iterations
18:17:54: Solving sinusoidal problem.
18:17:54: Problem sinusoidal converged after 1082 iterations
18:17:54: Solving cubic problem.
18:17:54: Problem cubic converged after 1060 iterations

Executando a iteração vetorial direta, não particionada, mostrada anteriormente, obtemos o seguinte:

1
2
3
4
5
6
7
18:19:25: Running vector iteration.
18:19:25: Solving linear problem.
18:19:25: Problem linear converged after 2800 iterations
18:19:25: Solving sinusoidal problem.
18:19:25: Problem sinusoidal converged after 1110 iterations
18:19:25: Solving cubic problem.
18:19:25: Problem cubic converged after 1070 iterations

É curioso ver que a iteração particionada converge um pouco mais rapidamente do que a direta para o caso linear. Provavelmente isto se deve ao fato de que a ordenação que acontece com a iteração particionada tende a trazer o efeito da condição de contorno um pouco mais rapidamente do que a iteração direta. Este diagnóstico ainda é completamente especulativo, no entanto.

Fatorando a Atualização

O próximo passo para a paralelização é encapsular a atualização em uma função que possa ser executada com facilidade por um executor.

O programa abaixo tem essa modificação.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
Sequential RB partitioned iteration.

It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""

import numpy  as np
import gssup05 as gs
import problem01 as pm
import matplotlib.pyplot as plt

gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
npart = 12

probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
                                            calc_rhs=gs.zero_rhs, npart=npart),
          'sinusoidal': pm.PartitionedProblem(L, nn,
                                         calc_rhs=gs.sin_rhs, A=A, npart=npart),
               'cubic': pm.PartitionedProblem(L, nn, UL=A,
                                         calc_rhs=gs.cub_rhs, A=A, npart=npart)}

#%%
def par_update(u, f, s):
    uold = u.copy()
    u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
    u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B]) 
    err_num = np.sum((uold - u)**2)
    err_den = np.sum(u**2)
    return err_num, err_den

#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
    gs.info("Solving {} problem.".format(name))
    err = np.zeros(maxiter, dtype=np.float64)
    converged = False
    for it in range(maxiter):
        err_num = 0.0
        err_den = 0.0
        up, fp, sp = p.parts()
        for u, f, s in zip(up, fp, sp):
            en, ed = par_update(u, f, s)
            err_num  += en
            err_den  += ed
        err[it] = np.sqrt(err_num/err_den)
        if err[it] <= tol:
            converged = True
            gs.info('Problem {} converged after {} iterations'.\
                                                          format(name, it))
            break
    if not converged:
        gs.info('Problem {} failed to converge after {} iterations'.\
                                                          format(name, it))
    err = np.resize(err, it+1)
    p.hist = err

figs = { 'linear': (0,3),
     'sinusoidal': (1,4),
          'cubic': (2,5)}

styles = {'rb': 'r+',
         'rbv': 'b'}

for name in probs_rbv.keys():
    plt.figure(figs[name][0])
    plt.title("Solution for problem {}".format(name))
    plt.figure(figs[name][1])
    plt.yscale('log')
    plt.title("{} problem convergence history".format(name))

# Plot graphs correctly
for name, p in probs_rbv.items():
    plt.figure(figs[name][0])
    plt.plot(p.x, p.u, styles['rbv'])     
    plt.figure(figs[name][1])
    plt.plot(p.hist, styles['rbv'])    

Implementação “Paralela”

Agora basta apenas executar a função de atualização em paralelo. Temos que garantir que as atualizações de cada cor não interfiram com a outra cor. Por isto dividimos o domínio sempre terminando em uma cor preta e começando com uma cor vermelha, e vamos introduzir uma chamada de sincronização dentro da função de atualização (barrier.wait()), para garantir que a atualização da cor preta só comece quando a vermelha acabar.

Uma implementação melhor faria a sincronização entre os vizinhos apenas, mas como o número de threads é muito pequeno, isto não é realmente importante.

Não é necessário sincronizar a atualização da cor preta, já que está implícita pelo fim dos threads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
Sequential RB partitioned iteration.

It's in your best interest to make sure that there are enough
points left in a partition after the domain is split.
"""

import numpy  as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier

import matplotlib.pyplot as plt

gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 101
maxiter = 10000
tol = 5e-5
nthread = 4

probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
                                        calc_rhs=gs.zero_rhs, npart=nthread),
          'sinusoidal': pm.PartitionedProblem(L, nn,
                                    calc_rhs=gs.sin_rhs, A=A, npart=nthread),
               'cubic': pm.PartitionedProblem(L, nn, UL=A,
                                    calc_rhs=gs.cub_rhs, A=A, npart=nthread)}
#%%
class Updater():
    def __init__(self, nthread):
        self.barrier = Barrier(nthread)

    def par_update(self, u, f, s):
        pass
        uold = u.copy()
        u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
        self.barrier.wait()
        u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B]) 
        err_num = np.sum((uold - u)**2)
        err_den = np.sum(u**2)
        return err_num, err_den

#%% Partitioned Iterations
gs.info("Running partitioned RB iteration.")
updater = Updater(nthread)
for name, p in probs_rbv.items():
    gs.info("Solving {} problem.".format(name))
    err = np.zeros(maxiter, dtype=np.float64)
    converged = False
    for it in range(maxiter):
        err_num = 0.0
        err_den = 0.0
        up, fp, sp = p.parts()
        with ThreadPoolExecutor() as executor:
            res = executor.map(updater.par_update, up, fp, sp)
            for en, ed in res:
                err_num  += en
                err_den  += ed
        err[it] = np.sqrt(err_num/err_den)
        if err[it] <= tol:
            converged = True
            gs.info('Problem {} converged after {} iterations'.\
                                                          format(name, it))
            break
    if not converged:
        gs.info('Problem {} failed to converge after {} iterations'.\
                                                          format(name, it))
    err = np.resize(err, it+1)
    p.hist = err

figs = { 'linear': (0,3),
     'sinusoidal': (1,4),
          'cubic': (2,5)}

styles = {'rb': 'r+',
         'rbv': 'b'}

for name in probs_rbv.keys():
    plt.figure(figs[name][0])
    plt.title("Solution for problem {}".format(name))
    plt.figure(figs[name][1])
    plt.yscale('log')
    plt.title("{} problem convergence history".format(name))

# Plot graphs correctly
for name, p in probs_rbv.items():
    plt.figure(figs[name][0])
    plt.plot(p.x, p.u, styles['rbv'])     
    plt.figure(figs[name][1])
    plt.plot(p.hist, styles['rbv'])    

Paralelização por subdomníos

A implementação acima de certa forma não forma do problema que descrevemos no começo desta seção, a criação de um novo thread para cada partição em cada iteração.

Na maioria dos sistemas operacionais e linguagens de programação, a criação de threads é muito rápida, particularmente quando comparada ao custo de atualização de cada partição, e isto não seria um problema real.

É interessante, no entanto, em preparação para a implementação paralela com processos que não compartilham memória, desenvolver uma implementação na qual é criado um único threado para cada subdomínio, que realiza todas as iterações e sincroniza quando necessário com os vizinhos.

Isto torna o programa um pouco mais complicado, pois agora é necessário garantir o acesso exclusivo a variáveis compartilhadas e computar resultados com informações que estão sendo computadas por threads distintos.

Uma implementação desta técnica está mostrada abaixo. Há várias coisas interessantes nesta implementação:

  1. Toda a lógica de atualização foi movida para uma classe, onde as variáveis compartilhadas por todos os threads são membros do objeto, e as variáveis replicadas em cada thread são as variáveis locais da função membro par_update().
  2. O cálculo da norma e a verificação da convergência são feitos por um único thread, os outros esperam e apenas leem o resultado deste teste.
  3. Cada thread calcula a parcela correspondente da norma do erro e a guarda em uma posição própria de um vetor compartilhado por todos os threads, então não há necessidade de locks.
  4. Ou todos os threads convergem, ou ninguém converge, todos os threads retornam um valor apenas por simetria, isto não é estritamente necessário.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
import numpy  as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier

import matplotlib.pyplot as plt


gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 22
maxiter = 10000
tol = 5e-5
nthread = 4

probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
                                        calc_rhs=gs.zero_rhs, npart=nthread),
          'sinusoidal': pm.PartitionedProblem(L, nn,
                                    calc_rhs=gs.sin_rhs, A=A, npart=nthread),
               'cubic': pm.PartitionedProblem(L, nn, UL=A,
                                   calc_rhs=gs.cub_rhs, A=A, npart=nthread)}

#%%
class Updater():
    def __init__(self, nthread, tol):
        """
        Performs parallel iterations on all subdomains.

        nthread: number of threads to launch
        tol:    iterative update tolerance
        """
        # These are all shared among all threads
        self.tol = tol
        self.barrier = Barrier(nthread)

        self.err = np.zeros(maxiter, dtype=np.float64)
        self.err_num = 0.0
        self.err_den = 0.0
        self.err_num_t = np.zeros(nthread)
        self.err_den_t = np.zeros(nthread)

    def par_update(self, u, f, s, thread_idx):
        """
        Run the iterarions for each subdomain in parallel

        u: view of the partition for the unknown
        f: view of the partitions for the rhs
        s: RB orderings for this partition
        thread_idx: this thread id
        """  
        converged = False
        for it in range(maxiter):
            uold = u.copy()
            u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
            self.barrier.wait()
            u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B]) 
            self.err_num_t[thread_idx] = np.sum((uold - u)**2)
            self.err_den_t[thread_idx] = np.sum(u**2)
            self.barrier.wait()
            if thread_idx == 0:
                en = np.sum(self.err_num_t)
                ed = np.sum(self.err_den_t)
                self.err[it] = np.sqrt(en/ed)
            self.barrier.wait()                 # This one was hard af.
            if self.err[it] <= self.tol:
                converged = True
                break
        return converged, it+1

##%%  Does a single thread for each partition from beginning to the end
updater = Updater(nthread, tol)
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
    gs.info("Solving {} problem.".format(name))
    up, fp, sp = p.parts()
    with ThreadPoolExecutor() as executor:
        res = executor.map(updater.par_update, up, fp, sp, range(nthread))
        for conv, nit in res:
            if not conv:                
                gs.info('Problem {} failed to converge after {} iterations'.\
                    format(name, nit))
            gs.info('Problem {} converged after {} iterations'.\
                                                          format(name, nit))
            # either everyones did or no one did
            p.hist = updater.err[:nit].copy()
            break

figs = { 'linear': (0,3),
     'sinusoidal': (1,4),
          'cubic': (2,5)}

styles = {'rb': 'r+',
         'rbv': 'b'}

for name in probs_rbv.keys():
    plt.figure(figs[name][0])
    plt.title("Solution for problem {}".format(name))
    plt.figure(figs[name][1])
    plt.yscale('log')
    plt.title("{} problem convergence history".format(name))

# Plot graphs correctly
for name, p in probs_rbv.items():
    plt.figure(figs[name][0])
    plt.plot(p.x, p.u, styles['rbv'])     
    plt.figure(figs[name][1])
    plt.plot(p.hist, styles['rbv'])    

O cálculo da norma do erro no programa está feito de uma forma muito mais característica de um programa que segue o modelo de memória distribuída do que um implementado segundo o modelo de memória compartilhada. Fizemos isto propositalmente, por compatibilidade, mas é interessante ver como seria a implementação mais clássica de um modelo de memória compartilhada.

A principal diferença é que os resultados das acumulações mostradas nas linhas 66 e 67 acima seriam guardados diretamente em variável compartilhadas por todos os threads, protegidas por um lock para evitar conflitos de acesso. A implementação a seguir tem esta modificação, mas a implementação anterior continua nossa preferida.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
import numpy  as np
import gssup05 as gs
import problem01 as pm
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier, Lock

import matplotlib.pyplot as plt


gs.info("Setting up problems.")
A = 4.0
L = 3.0
nn = 22
maxiter = 10000
tol = 5e-5
nthread = 4

probs_rbv = { 'linear': pm.PartitionedProblem(L, nn, UL=A,
                                        calc_rhs=gs.zero_rhs, npart=nthread),
          'sinusoidal': pm.PartitionedProblem(L, nn,
                                    calc_rhs=gs.sin_rhs, A=A, npart=nthread),
               'cubic': pm.PartitionedProblem(L, nn, UL=A,
                                   calc_rhs=gs.cub_rhs, A=A, npart=nthread)}

#%%
class Updater():
    def __init__(self, nthread, tol):
        """
        Performs parallel iterations on all subdomains.

        nthread: number of threads to launch
        tol:    iterative update tolerance
        """
        # These are all shared among all threads
        self.tol = tol
        self.barrier = Barrier(nthread)
        self.lock = Lock()

        self.err = np.zeros(maxiter, dtype=np.float64)
        self.err_num = 0.0
        self.err_den = 0.0

    def par_update(self, u, f, s, thread_idx):
        """
        Run the iterarions for each subdomain in parallel

        u: view of the partition for the unknown
        f: view of the partitions for the rhs
        s: RB orderings for this partition
        thread_idx: this thread id
        """  
        converged = False
        for it in range(maxiter):
            uold = u.copy()
            u[s.R] = 0.5*(u[s.Rm]+u[s.Rp] - p.dx2*f[s.R])
            self.barrier.wait()
            u[s.B] = 0.5*(u[s.Bm]+u[s.Bp] - p.dx2*f[s.B]) 
            err_num_t = np.sum((uold - u)**2)
            err_den_t = np.sum(u**2)
            with self.lock:
                self.err_num += err_num_t 
                self.err_den += err_den_t
            self.barrier.wait()
            if thread_idx == 0:
                self.err[it] = np.sqrt(self.err_num/self.err_den)
                self.err_num = 0.0
                self.err_den = 0.0
            self.barrier.wait()                 # This one was hard af.
            if self.err[it] <= self.tol:
                converged = True
                break
        return converged, it+1

##%%  Does a single thread for each partition from beginning to the end
updater = Updater(nthread, tol)
gs.info("Running partitioned RB iteration.")
for name, p in probs_rbv.items():
    gs.info("Solving {} problem.".format(name))
    up, fp, sp = p.parts()
    with ThreadPoolExecutor() as executor:
        res = executor.map(updater.par_update, up, fp, sp, range(nthread))
        for conv, nit in res:
            if not conv:                
                gs.info('Problem {} failed to converge after {} iterations'.\
                    format(name, nit))
            gs.info('Problem {} converged after {} iterations'.\
                                                          format(name, nit))
            # either everyones did or no one did
            p.hist = updater.err[:nit].copy()
            break

figs = { 'linear': (0,3),
     'sinusoidal': (1,4),
          'cubic': (2,5)}

styles = {'rb': 'r+',
         'rbv': 'b'}

for name in probs_rbv.keys():
    plt.figure(figs[name][0])
    plt.title("Solution for problem {}".format(name))
    plt.figure(figs[name][1])
    plt.yscale('log')
    plt.title("{} problem convergence history".format(name))

# Plot graphs correctly
for name, p in probs_rbv.items():
    plt.figure(figs[name][0])
    plt.plot(p.x, p.u, styles['rbv'])     
    plt.figure(figs[name][1])
    plt.plot(p.hist, styles['rbv'])    

  1. Lembrando que não adianta nada em termos de desempenho