/ramiro/demec/ufpe

Multiprocessamento

Introdução

A linguagem Python padronizou a interface ao multiprocessamento com o módulo multiprocessing. Esta interface permite a criação a comunicação entre processos, criados local ou remotamente.

A interface deste módulo replica várias características da interface do módulo de Threads, mas também introduz muitas novas facilidades. Vamos seguir mais ou menos o roteiro que fizemos quando apresentamos threds, depois vamos mostrar as novidades associadas ao multiprocessamento.

É importante entender que a criação de processos e o gerenciamento da comunicação entre eles são características de todos os sistemas operacionais modernos, isto é feito há várias décadas, pode ser acessado de muitas linguagens de programação. Por exemplo, as operações básicas do unix tradicional ou do linux são baseadas no modelo fork and join. Este módulo simplesmente acrescenta uma interface melhor à esta funcionalidade.

Pré-requisitos

Não podemos fazer nada com multiprocessamento antes de importar o módulo de multiprocessamento, que é padrão nas versões mais novas da linguagem.

1
import multiprocessing as mp

Processos

A classe que representa um processo é chamada Process, sem muitas surpresas. Depois de criado, a execução de um processo deve ser iniciada com .start(), e podemos esperar seu término com .join(), exatamente com fazemos com threads.

Vamos ver alguns exemplos abaixo, mas antes vamos criar um módulo para colocarmos as funções de suporte, para diminuir o volume de código repetido em cada exemplo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
"""
mp_support.py

Support functions and objects for multiprocessing examples
"""

import logging

__format = "%(asctime)s: %(message)s"
logging.basicConfig(format=__format, level=logging.INFO, datefmt="%H:%M:%S")

def info(*args, **kwargs):
    """Directly logs info calls, 99% of what we're doing."""
    logging.info(*args, **kwargs)

Por enquanto só temos a função que faz o logging das mensagens, da mesma forma que para os exemplos de threading.

Vamos criar um processo, da maneira mais simples possível e praticamente identicamente ao que fizemos para a criação de threads. Tradicionalmente, chamamos o processo que cria os processos de processo pai, e os processos que são criados de processos filhos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import multiprocessing as mp
import mp_support as ms
import time

def run_thing(name, rest_for):
    ms.info("Started Process {}, sleeping for {}s".\
            format(name, rest_for))
    time.sleep(rest_for)
    ms.info("Process {} finished".format(name))

if __name__ == "__main__":
    p = mp.Process(target=run_thing, args=['proc_1', 10])
    p.start()
    p.join()

Como anteriormente, o código acima é uma maluquice, pois poderíamos ter simplesmente chamado a função diretamente. As vantagens começam a aparecer quando criamos múltiplos processo, o que vamos fazer mais tarde. Antes disto, vamos ver outras características desta classe.

Assim como com theads, podemos criar uma classe que herda da classe Process e redefinir o método .run().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import multiprocessing as mp
import mp_support as ms
import time

class RunThing(mp.Process):
    def __init__(self, name, rest_for):
        super().__init__()
        self._r = rest_for

    def run(self):
        ms.info("Started Process {}, sleeping for {}s".\
               format(self._n, self._r))
        time.sleep(self._r)
        ms.info("Process {} finished".format(self._n))

if __name__ == "__main__":
    p = RunThing('Proc_01', 8)
    p.start()
    p.join()

O que é mais conveniente depende do contexto e do hábito do programador.

Uma observação interessante é que, da mesma forma que Threads, os objetos da classe Process já tem uma membro .name, portanto este que definimos nos exemplos acima, (e todas as vezes que fizemos isto com threads) é redundante. Este membro é só uma conveniência e não tem nenhuma função semântica. Podemos simplificar um pouco o programa acima então,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing as mp
import mp_support as ms
import time

class RunThing(mp.Process):
    def __init__(self, name=None, rest_for=0):
        if name:
            super().__init__(name=name)
        else:
            super().__init__()

        self._r = rest_for

    def run(self):
        ms.info("Started Process {}, sleeping for {}s".\
               format(self.name, self._r))
        time.sleep(self._r)
        ms.info("Process {} finished".format(self.name))

if __name__ == "__main__":
    p = RunThing('Proc_01', 8)
    p.start()
    p.join()
    p = RunThing(rest_for=4)
    p.start()
    p.join()

Perceba que o Python gera um nome default caso você não atribua um.

Um objeto da classe Process tem a mesma assinatura que um objeto da classe Thread. Em particular, podemos verificar se um processo está vivo ou não.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing as mp
import mp_support as ms
import time

class RunThing(mp.Process):
    def __init__(self, rest_for=0):
        super().__init__()
        self._r = rest_for

    def run(self):
        ms.info("Started Process {}, sleeping for {}s".\
               format(self.name, self._r))
        time.sleep(self._r)
        ms.info("Process {} finished".format(self.name))

if __name__ == "__main__":
    procs = [ RunThing(rest) for rest in (12, 7, 4, 6) ]
    for p in procs:
        p.start()
    for p in procs:
        ms.info("{} is_alive?: {}".format(p.name, p.is_alive()))
    for p in procs:
        p.join()

Também podemos ver no programa acima que os processos são objetos normais que podem fazer parte de listas, tuplas, dicionários, etc.

Os processos também tem membros que não existem em Threads, como pid, que é número do processo, exitcode, que é o código de erro de saída do processo filho.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import multiprocessing as mp
import mp_support as ms
import time

def run_thing(data, rest):
    a = 10/data
    time.sleep(rest)

if __name__ == "__main__":
    procs = [mp.Process(target=run_thing, args=(d, r)) for d, r in\
                 zip((2, 0, 0, 2), (12, 7, 4, 6)) ]
    for p in procs:
        p.start()
        ms.info("{name} has pid {pid}.".format(name=p.name, pid=p.pid))
    for p in procs:
        p.join()
    for p in procs:
        ms.info("{} exit code is: {}".format(p.name, p.exitcode))

Existem outros membros desta classe que provavelmente não devem ser usadas por pessoas normais em condições normais. Você pode ver a documentação do módulo se estiver interessado.

Mecanismos de lançamento de Processos

Infelizmente o mecanismo para criação de processos é um pouco dependente da plataforma na qual o programa está rodando. Há três alternativas:

spawn
O processo pai dá partida a um novo interpretador do Python. O processo filho só herda o estritamente necessário para rodar o seu método run(). Isto é bem caro comparado com os outros, existe no Unix e no Windoes e é o padrão no Windows.
fork
O processo pai chama os.fork(), que cria uma cópia na memória do interpretador Python. O processo filhor é idêntico ao processo pai, quando começa, e todos os seus recursos são herdados, como arquivos abertos, etc. Não é uma boa ideia usar isto se o processo pai for multithreaded. Só existe no Unix, onde é o padrão.
forkserver
O sistema cria um processo servidor responsável pela criação de processos através do os.fork(). Este processo é single threaded então pode chamar fork sem problemas. Nada não essencial é compartilhado. Disponível em apenas algumas versões do Unix.

Para escolher o mecanismo de lançamento de processos usamos a função set_start_method() do módulo multiprocessing, que só deve ser chamada uma vez por programa, na parte que é executada uma única vez!

Este programa pode não funcionar, dependendo de onde o programa é executado. Em particular, não funciona se você rodar dentro do spyder no linux, que já seleciona o contexto automaticamente.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing as mp
import mp_support as ms
import time

class RunThing(mp.Process):
    def __init__(self, rest_for=0):
        super().__init__()
        self._r = rest_for

    def run(self):
        ms.info("Started Process {}, sleeping for {}s".\
               format(self.name, self._r))
        time.sleep(self._r)
        ms.info("Process {} finished".format(self.name))

if __name__ == "__main__":
    methods = ('spawn', 'fork', 'forkserver')
    m = methods[0]
    ms.info("Created processes with {} method".format(m))
    mp.set_start_method(m)
    procs = [ RunThing(rest) for rest in (12, 7, 4, 6) ]
    for p in procs:
        p.start()
        ms.info("{}: {}".format(p.name, p.pid))
    for p in procs:
        p.join()

Por este e outros motivos é importante que você se certifique que o código que vai ser executado como um novo processo pode ser carregado como um módulo sem que ele tente gerar outros processos. Por exemplo, o código a seguir falha.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import multiprocessing as mp
import mp_support as ms
import time

def run_thing(name, rest_for):
    ms.info("Started Process {}, sleeping for {}s".\
            format(name, rest_for))
    time.sleep(rest_for)
    ms.info("Process {} finished".format(name))

mp.set_start_method('spawn')
p = mp.Process(target=run_thing, args=['proc_1', 10])
p.start()
p.join()

Perceba que a única diferença é a falta da proteção da linha

1
if __name__ == "__main__":

que impede que o resto do arquivo seja executado quando é carregado como um módulo.

É interessante rodar o programa abaixo com os três métodos de lançamento e estudar os resultados no seu sistema.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import multiprocessing as mp
import mp_support as ms
import time
import os

def showids(title):
    ms.info(title)
    ms.info('Module name: {}'.format(__name__))
    ms.info('Parent process id: {}'.format(os.getppid()))
    ms.info('This process id: {}'.format(os.getpid()))

def run_thing(myId, rest_for):
    name = "Proc_{:02d}".format(myId)
    showids('*** run_thing '+name)
    ms.info("Started Process {}, sleeping for {}s".\
            format(name, rest_for))
    time.sleep(rest_for)
    ms.info("Process {} finished".format(name))

if __name__ == "__main__":
    mp.set_start_method('spawn')
    showids('*** Main program')
    p1 = mp.Process(target=run_thing, args=[0, 10])
    p2 = mp.Process(target=run_thing, args=[1, 5])
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Comunicação

O processos normalmente tem que trocar informações durante a execução de um programa, e não podem fazer isto através de variáveis compartilhadas. Há algumas classes que permitem a comunicação entre processos filhos e entre o processo pai e seus filhos.

É claro que existe uma comunicação entre pai e filhos que pode ser feita através da passagem de argumentos e retorno de valores de função, mas precisamos de mecanismos mais gerais.

Queues

Uma fila (Queue) é uma implementação para multiprocessamento da estrutura de dados sequencial Queue. Basicamente, é uma sacola onde você vai enfiando e tirando coisas, com a garantia de que as coisas saem na mesma ordem que entraram, isto é, o primeiro que entra é o primeiro que sai, bem, como em um fila.

Você pode atribuir um tamanho máximo à fila, ou ela pode crescer indefinidamente, enquanto houver memória. Se a fila estiver cheia, a chamada que adiciona items à fila bloqueia até que haja espaço ou ocorra um timeout.

Um exemplo simples está mostrado a seguir.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import multiprocessing as mp
import mp_support as ms
from   numpy.random import randint

def run_thing(q):
    q.put(randint(0, 20, 10))

if __name__ == "__main__":
    q = mp.Queue()
    p = mp.Process(target=run_thing, args=[q])
    p.start()
    p.join()
    # Process already dead
    while not q.empty():
        ms.info("Got item: {}".format(q.get()))

Uma fila pode ter vários processos alimentando e vários processos consumindo, e, como estamos no mundo encantado do Python, os objetos podem ser de qualquer tipo, dentro da razoabilidade. Não é uma boa ideia querer enviar um handle de um arquivo aberto de um processo para outro que pode estar em uma outra máquina que sequer tem acesso àquele arquivo, por exemplo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import multiprocessing as mp
import queue
import mp_support as ms
import time
import random

def waitabit():
    time.sleep(random.random()+0.1)

def run_thing_a(q):
    for i in range(20):
        waitabit()  
        q.put(i)
    ms.info("run_thing_a put everything.")

def run_thing_b(q):
    for l in "qwertyuiopasdfghjklçzxcvmbn<>":
        waitabit()  
        q.put(l)
    ms.info("run_thing_b put everything.")


def run_thing_c(q):
    for l in "!@#$%*(*()_+=§{[ªº}{]/?°;:\|":
        waitabit()  
        q.put(l)
    ms.info("run_thing_c put everything.")

def get_thing(q):
    while True: # Need a better termination control
        try:
            ms.info("Got item: {}".format(q.get(timeout=2)))
        except queue.Empty:
            break

if __name__ == "__main__":
    q = mp.Queue()
    procs = [ mp.Process(target=run_thing_a, args=[q]),
              mp.Process(target=run_thing_b, args=[q]),
              mp.Process(target=run_thing_c, args=[q]),
              mp.Process(target=get_thing, args=[q]),
              mp.Process(target=get_thing, args=[q])]

    for p in procs:
        p.start()
    for p in procs:
        p.join()
    ms.info("Father finished waiting.")

Um problema com o programa acima é que não sabemos a priori quantos items serão colocados na fila. Do jeito que o programa está feito, não podemos apenas verificar se a fila está vazia, pois, como os processos podem ser lançados em qualquer ordem, não é inconcebível que um consumidor teste a fila antes de qualquer produtor colocar alguma coisa dentro. Além disto, podemos dar o azar de que, devido aos atrasos na produção, não haja nada fila, mas os produtores não terminaram ainda. Por isto temos aquele laço infinito na linha 30 do qual só escapamos com um timeout.

Timeouts como agentes de controle do fluxo de execução do programa são normalmente muito questionáveis, devido à dificuldade em se estabelecer valores que não causem paradas falsas ou que não atrasem demais a execução. No entanto, com o que temos até agora, não temos outra alternativa que não complicasse demasiadamente o programa.

Uma primeira ideia seria cada produtor enviar uma “sentinela”, um valor especial que não deve ser processado para avisar que já encerrou a produção. O problema é que o produtor não sabe qual consumidor vai receber aquela sentinela, é possível que um único produtor receba todas e o outro não saiba que os produtores encerraram.

É possível, por exemplo, cada produtor enviar um número de sentinelas igual ao número de consumidores. Cada consumidor lê sentinelas até que o número de sentinelas lidas seja igual ao de produtores, e quando isto acontece, encerra suas atividades.

O programa abaixo implementa isto e também algumas óbvias melhorias em relação ao anterior.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing as mp
import mp_support as ms
import time
import random

sentinel = -1

def waitabit():
    time.sleep(random.random()+0.1)

def run_thing(q, ncons, data):
    for d in data:
        waitabit()  
        q.put(d)
    for i in range(ncons):
        q.put(sentinel)
    ms.info("run_thing put everything.")

def get_thing(q, nprod):
    np = 0
    while np < nprod:
        data = q.get()
        if data != sentinel:
            ms.info("Got item: {}".format(data))
        else:
            np += 1
    ms.info("Consumer finished!")

if __name__ == "__main__":
    data = (range(25),
            "qwertyuiopasdfghjklçzxcvmbn<>",
            "!@#$%*(*()_+=§{[ªº}{]/?°;:\|")
    q = mp.Queue()
    ncons = 2
    nprod = len(data)
    prods = [ mp.Process(target=run_thing, args=[q, ncons, d]) for d in data]
    cons = [mp.Process(target=get_thing, args=[q, nprod]) for i in range(ncons)]
    for p in prods+cons:
        p.start()
    for p in prods+cons:
        p.join()
    ms.info("Father finished waiting.")

Percebam que o programa acima não trata de um dos principais problemas deste tipo de sistema, que é como garantir que todos os consumidores recebam mais ou menos a mesma quantidade de dados. Não há nenhuma garantia de que um dos consumidores não processe todos os dados que foram colocados na fila, por exemplo. Evitar isto é um problema bem mais sofisticado que vamos fingir que não existe, mas que pode ser tratado com a criação de um scheduler que imponha alguma “justiça” na divisão do trabalho. Quem estiver interessado neste assunto deve consultar algum livro sobre implementação de sistemas operacionais.

Pipes

Devido ao fato de filas serem bem genéricas, existe outra classe de objetos usados para comunicação que é mais específica, que são os pipes, que, essencialmente, são filas que tem apenas dois processos. Os pipes podem ser unidirecionais ou bidirecionais, e podem ser criados entre quaisquer dois processos, que podem ser irmãos ou pai e filho.

Um item é enviado com .send() em uma extremidade e recebido com .recv() na outra. Também existem versões que enviam buffers de bytes diretamente se este for o seu interesse.

Vamos ver um exemplo bem simples de um pipe em ação.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import multiprocessing as mp
import mp_support as ms
import time
import random

sentinel = -1

def waitabit():
    time.sleep(random.random()+0.1)

def run_thing(pipe):
    for l in "!@#$%*(*()_+=§{[ªº}{]/?°;:\|":
        waitabit()  
        pipe.send(l)
    pipe.send(sentinel)
    ms.info("run_thing sent everything.")

def get_thing(pipe):
    while True: # Need a better termination control
        data = pipe.recv()
        ms.info("Got item: {}".format(data))
        if data == sentinel:
            break

if __name__ == "__main__":
    pend1, pend2 = mp.Pipe(False) # not duplex
    procs = [ mp.Process(target=run_thing, args=[pend1]),
              mp.Process(target=get_thing, args=[pend2])]

    for p in procs:
        p.start()
    for p in procs:
        p.join()
    ms.info("Father finished waiting.")

O exemplo abaixo é um pouco mais sofisticado, pois envolve a comunicação bidirecional em um pipe (o que é o default) e a comunicação entro o pais e seus vários filhos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import multiprocessing as mp
import mp_support as ms
import time
import random

sentinel = -1

def waitabit():
    time.sleep(random.random()+0.1)

class Computator(mp.Process):
    def __init__(self, conn):
        super().__init__()
        self.c = conn

    def run(self):
        res = 0
        while True: # Need a better termination control
            data = self.c.recv()
            ms.info("{} got item: {}".format(self.name, data))
            if data == sentinel:
                break
            res += data
        self.c.send(res)

if __name__ == "__main__":
    nworker = 10
    ntask = 20 # per worker
    procs = []
    for i in range(nworker):
        c1, c2 = mp.Pipe()
        p = Computator(c2)
        procs.append((p, c1))

    for p, _ in procs:
        p.start()

    data = 1
    for d in range(ntask):
        for _, c in procs: 
            c.send(data)
            data +=1
    for _, c in procs:
        c.send(sentinel)
    res = 0
    for _, c in procs:
        res += c.recv()

    ms.info("Father totalled: {}".format(res))
    for p, _ in procs:
        p.join()

Sincronização

Todas as primitivas de sincronização que vimos para threads também estão disponíveis para o multiprocessamento, podemos serializar o acesso a um recurso com um Lock(), ou sincronizar os processos com .Barrier(), por exemplo.

O exemplo abaixo mostra como podemos serializar o acesso à escrita de um arquivo. O programa verifica se o arquivo existe e o remove antes de começar a escrever, de forma que sempre é escrito um novo arquivo. É necessária uma certa lógica para garantir que apenas um thread realize esta operação.

É claro que, para este caso específico, seria muito mais simples fazer esta verificação e a remoção do arquivo pré-existente no programa principal. Estamos fazendo a operação dentro dos processos para demonstrar o procedimento de como garantir que apenas um processo, que você não sabe qual é a priori, faça alguma coisa.

Outra alternativa, mais simples do que esta, seria dar nomes conhecidos ao processo, e escolher um deles a priori para fazer o teste de existência e a remoção. Isto ainda necessitaria de uma sincronização, é claro.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import multiprocessing as mp
import mp_support as ms
import datetime as dt
import os

class Escrevator(mp.Process):
    def __init__(self, fname, lock, semaphore, event):
        super().__init__()
        self._l = lock
        self._f = fname
        self._s = semaphore
        self._e = event

    def run(self):
        self._s.acquire() # one thread checks and deletes file
        if self._e.is_set():
            self._s.release() # file already erased
        else:
            try:
                os.remove(self._f)
                self._e.set()
                self._s.release()
            except OSError as e:
                ms.info('{name}: {error} removing file {file}'.\
                        format(name=self.name, error=e, file=self._f))
                self._e.set()
                self._s.release()

        with open(self._f, "a") as outf:
            for i in range(10):
                with self._l:
                    outf.write('{name}: line={line}, time={now}\n'.\
                        format(name=self.name, line=i, now=dt.datetime.now()))
                    outf.flush()
            outf.close()


if __name__ == "__main__":
    nworker = 10
    lock = mp.Lock()
    cleared_file = mp.Event() # starts false
    semaphore = mp.Semaphore(1)
    procs = [ Escrevator('lock_test.txt', lock, semaphore, cleared_file) \
                                                     for i  in range(nworker) ]
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    ms.info('Main process finished execution')

Referências

  1. Documentação do módulo multiprocessing.