Threading acontece quando várias linhas de execução de um programa ocorrem simultaneamente, e todas acessam o mesmo espaço de memória.
Isto corresponde muito diretamente ao modelo de computação paralela com memória compartilhada, e é um modelo de programação muito empregado em aplicativos comerciais e sistemas operacionais para aproveitar, por exemplo, os múltiplos núcleos de um processador moderno.
Usar threads implica, no entanto, em uma programação de baixo nível, no sentido de que o programador é responsável por muitas coisas que poderiam ser feitas automaticamente pela própria linguagem ou pelo sistema operacional.
Tipicamente, o controle de acesso às variáveis compartilhadas é feito pelo próprio programador explicitamente, e isto pode ser bastante complicado. Em programação científica, a programação direta com threads é usada muito raramente. Em geral, usamos um sistema de mais alto nível, que tem primitivas mais relacionadas com as atividades de programação científica, conhecido como OpenMP.
Infelizmente, na implementação padrão da linguagem Python, existe o odiado GIL, de Global Interpreter Lock. Devido a dificuldades de implementação do interpretador da linguagem Python, motivos históricos e razões técnicas, o acesso às variáveis internas do interpretador Python é bloqueado para todos os threads menos um a cada instante de tempo.
O accesso às variáveis é serializado e isto significa que apenas um thread executa a cada instante de tempo, mesmo que, aparentemente, todos estejam sendo executados. Isto siginifica também que, mesmo que você tenha quatro threads e quatro núcleos disponíveis no processador, o seu programa não deve executar mais rapidamente, a menos de condições extremamente especiais.
Por que existem threads em Python então? Algumas vezes, um thread pode estar "executando" sem gastar CPU, como por exemplo quando está esperando por algum entrada via rede ou sistema de arquivos, ou esperando um dispositivo tornar-se disponível, etc. A programação com threads torna este tipo de programa mais simples e possivelmente mais eficiente.
Threads são integrados na biblioteca padrão do Python (de uma versão relativamente recente), portanto você pode usá-los simplesmente importanto o módulo como mostrado a seguir.
import threading
A partir daí, você pode acessar as classes e funções deste módulo nos seus programas.
Não é estritamente necessário, mas é útil usar o módulo logging, que permite que você
escreva mensagens com data e hora, automaticamente, e redirecionar para vários destinos.
Vamos ver um exemplo aqui.
import logging import time if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") time.sleep(5) logging.info("Main program ended!")
O principal objeto para programação com threads em Python é a classe Thread.
Um objeto desta classe corresponde a um fluxo de execução, e você pode associar código a este objeto de duas maneiras diferentes, durante a construção do objeto ou sobrecarrando o método .run do objeto. Vamos ver exemplos destas duas formas. Um thread começa a executar quando o seu método .start é chamado.
Vamos ver um exemplo simples com .start
import threading as td import logging import time def thread_prog(name, rest_for): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=name, rest=rest_for)) time.sleep(rest_for) logging.info('Finished execution of {name}'.format(name=name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") t1 = td.Thread(target=thread_prog, args=('Thread 1', 5)) t1.start() logging.info("Main program ended!")
E aqui a mesma coisa com um thread como um novo objeto.
import threading as td import logging import time class RunThing(td.Thread): def __init__(self, name, rest_for): super().__init__() self.name = name self.rf = rest_for def run(self): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=self.name, rest=self.rf)) time.sleep(self.rf) logging.info('Finished execution of {name}'.format(name=self.name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") t1 = RunThing('Thread 1', 5) t1.start() logging.info("Main program ended!")
Vamos ver o que acontece quando colocamos mais de uma thread executando simultaneamente.
class RunThing(td.Thread): def __init__(self, name, rest_for): super().__init__() self.name = name self.rf = rest_for def run(self): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=self.name, rest=self.rf)) time.sleep(self.rf) logging.info('Finished execution of {name}'.format(name=self.name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") t1 = RunThing('Thread 1', 5) t2 = RunThing('Thread 2', 10) t3 = RunThing('Thread 3', 15) t3.start() t2.start() t1.start() logging.info("Main program ended!")
Vocês devem ter percebido que o programa principal termina antes dos threads, o que é bem estranho. Nós podemos, no entanto, suspender a execução do programa enquanto ele aguarda algum thread terminar.
No Python existem daemon threads, que são encerradas automaticamente quando progama termina. Rode o programa anterior novamente, com esta nova definição para a classe.
def __init__(self, name, rest_for): super().__init__(daemon=True) self.name = name self.rf = rest_for
Normalmente, não é isto que você deseja, então existem maneiras de esperar que threads terminem antes de continuar a execução, usando o método '.join', como no exemplo abaixo.
class RunThing(td.Thread): def __init__(self, name, rest_for): super().__init__(daemon=True) self.name = name self.rf = rest_for def run(self): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=self.name, rest=self.rf)) time.sleep(self.rf) logging.info('Finished execution of {name}'.format(name=self.name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") t1 = RunThing('Thread 1', 5) t2 = RunThing('Thread 2', 10) t3 = RunThing('Thread 3', 15) t3.start() t2.start() t1.start() t3.join() logging.info('Detected end of {}'.format(t3.name)) t2.join() logging.info('Detected end of {}'.format(t2.name)) t1.join() logging.info('Detected end of {}'.format(t1.name)) logging.info("Main program ended!")
Perceba que isto impõe uma ordem na detecção do final de cada thread e isto pode ter sérias consequências como veremos adiante.
Claramente enumerar explicitamente as threads de um programa é muito restritivo, mas, como threads são objetos normais do Python, podemos guardá-los em qualquer estrutura de dados da linguagem, como listas, dicionários, tuplas, etc., por exemplo,
import threading as td import logging import time import random class RunThing(td.Thread): def __init__(self, idx): super().__init__(daemon=True) self.name = 'Thread_{:02d}'.format(idx) self.rest_for = random.randint(1, 10) def run(self): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=self.name, rest=self.rest_for)) time.sleep(self.rest_for) logging.info('Finished execution of {name}'.format(name=self.name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Creating threads!") threads = [ RunThing(idx) for idx in range(10)] logging.info("Starting threads!") for t in threads: t.start() logging.info("Waiting for termination threads!") for t in threads: t.join() logging.info("Main program ended!")
Novamente, perceba que o trecho
for t in threads: t.join()
implica que você está esperando os threads terminarem, sequencialmente, na mesma ordem em que você chamou os métodos .join.
Em muitos casos, não é isto o que você quer, você deseja normalmente ser notificado imediatamente que algum thread terminou para tomar alguma atitude, como por exemplo iniciar um novo thread com novos dados. Para fazer isto precisamos introduzir outras coisas.
Para gerenciar threads de maneira mais sofisticada, passamos a não criá-las diretamente, e usamos objetos especiais da biblioteca do Python, que estão no módulo concurrent.futures.
Estes objetos são Executors, que gerenciam entidades que rodam em paralelo, e futures, que
representam uma computação que não retorna imediatamente, e cujo resultado vai ser fornecido por
ela em algum tempo no futuro. Existem Executors que gerenciam threads e que gerenciam processos,
vamos nos preocupar, por enquanto, apenas com aqueles que gerenciam threads.
Um Executor roda uma coisa que pode ser chamada, como uma função, assincronamente, e retorna um
future. Este objeto pode ser usado para esperar a execução do future terminar, cancelar a
execução, capturar o valor de retorno da função, etc.
Vamos ver alguns exemplos simples da funcionalidade destes objetos.
No trecho abaixo, estamos chamando uma função que apenas eleva alguma coisa ao quadrado. Note o uso do gerenciador de contexto.
from concurrent.futures import ThreadPoolExecutor import logging import time def run_thing(rest_for, data): time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: f = executor.submit(run_thing, 10, 4) r = f.result() logging.info('run_thing returned: {}'.format(r)) logging.info("Main program ended!")
Observe que não há razão prática alguma para escrever o programa assim, neste caso poderíamos chamar a função diretamente e não haveria nenhuma diferença. No entanto temos muita flexibilidade agora, podemos, por exemplo, verificar se a chamada terminou, desistir da chamada após um certo tempo e muitas outras coisas. Também supostamente é possível cancelar um thread se você tiver muita sorte, mas parece que isto é, em geral, uma péssima ideia.
from concurrent.futures import ThreadPoolExecutor, TimeoutError import logging import time def run_thing(rest_for, data): time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: f = executor.submit(run_thing, 15, 4) try: r = f.result(timeout=5) except TimeoutError: logging.info('Time out!') if f.running(): logging.info('Still running!') if f.done(): logging.info('Finished execution!') else: logging.info('Not done yet!') r = f.result() logging.info("Result of call is: {}".format(r)) logging.info("Main program ended!")
Como futures são assíncronos, podemos ter mais de um em execução simultaneamente.
from concurrent.futures import ThreadPoolExecutor
import logging import random import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: fs = [ executor.submit(run_thing, random.randint(1,10), 4*i) for i in range(1,6) ] logging.info("Waiting for thread termination.") res = [ f.result() for f in fs ] logging.info("Results of calls are: {}.".format(res)) logging.info("Main program ended!")
Em termos de evitar uma sequencialização do término, não ganhamos nada com isto, pois a linha
res = [ f.result() for f in fs ]
claramente implica na espera sequencial pelo término dos threads, na mesma ordem em que eles foram criados.
Este programa pode ser reescrito de forma muito mais elegante com o método .map de um executor.
from concurrent.futures import ThreadPoolExecutor import numpy as np import logging import random import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: nt = 5 rests = np.random.randint(1,10, nt) vals = [ 4*(i+1) for i in range(nt) ] logging.info("Waiting for thread resuls.") res = executor.map(run_thing, rests, vals) logging.info("Results of calls are: {}.".format(list(res))) logging.info("Main program ended!")
Existem duas funções do módulo concurrent.futures que pode ser usadas para tratar os threads
tão logo terminem, as_completed e wait.
A função as_completed retorna um iterador sobre os futures que vai fornecendo um future à medida
em que eles vão completando.
from concurrent.futures import ThreadPoolExecutor, as_completed import logging import numpy as np import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ] logging.info("Waiting for thread termination.") for f in as_completed(fs): logging.info("Got result {}.".format(f.result())) logging.info("Main program ended!")
A função wait é mais complexa. Em primeiro lugar, você tem que dizer se quer retornar quando
todos os threads terminem, quando o primeiro thread termine ou quando a primeira exceção aconteça,
com os flags ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION.
Esta função retorna dois conjuntos em uma tupla nomeada, o conjunto dos futures que terminaram e o conjunto daqueles que ainda não terminaram. É mais fácil ver isto em um exemplo.
from concurrent.futures import ThreadPoolExecutor, wait import logging import numpy as np import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ] logging.info("Waiting for thread termination.") all_fs = wait(fs) logging.info("Completed threads:") for f in all_fs.done: logging.info("Got result {}.".format(f.result())) for f in all_fs.not_done: logging.info("Should not happen!") logging.info("Main program ended!")
Como vimos acima, você pode esperar por uma de cada vez, também. Observe que no programa acima usamos
o valor default para return_when.
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED import logging import numpy as np import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") with ThreadPoolExecutor() as executor: fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ] logging.info("Waiting for thread termination.") while True: ffs = wait(fs, return_when=FIRST_COMPLETED) if ffs.done: for f in ffs.done: logging.info("Got result {}.".format(f.result())) else: break fs = ffs.not_done logging.info("Main program ended!")
O programa acima não tem aparentemente muitas vantagens em relação a usar as_completed, mas
agora podemos ir adicionando novas threads quando detectamos o término de uma.
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED import logging import numpy as np import time def run_thing(rest_for, data): logging.info("Waiting for {} seconds.".format(rest_for)) time.sleep(rest_for) return data*data if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") max_threads=30 start_threads = 10 run_threads = start_threads with ThreadPoolExecutor() as executor: fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,start_threads) ] logging.info("Waiting for thread termination.") while True: ffs = wait(fs, return_when=FIRST_COMPLETED) if ffs.done: newfs = list() for f in ffs.done: logging.info("Got result {}.".format(f.result())) t = np.random.randint(1,20,1)[0] if run_threads < max_threads: newfs.append(executor.submit(run_thing, t, t)) run_threads += 1 else: break fs = list(ffs.not_done)+newfs logging.info("Main program ended!")
O esquema acima é bem genérico e flexível, e permite que mantenhamos o número de threads em execução mais ou menos constante, como em um típico esquema "bag of tasks".
Um dos motivos para estudarmos threads, mesmo sem nenhum aumento de desempenho, é que podemos ver alguns dos problemas que podem ocorrer em computação paralela em um contexto muito simples.
Um programa está em deadlock quando todos os threads estão parados, esperando por alguma coisa que não pode acontecer. Isto normamente ocorre por causa de alguma dependência cruzada.
Execute o programa a seguir e perceba que os threads nunca terminam, já que o primeiro espera o segundo terminar, e o segundo espera o primeiro terminar.
import threading as td import logging import time class RunThing(td.Thread): def __init__(self, name, rest_for): super().__init__() self.name = name self.rf = rest_for self.sis = None def set_sis(self, sis): self.sis = sis def run(self): logging.info('Started thread {name}, will rest for {rest}s'.\ format(name=self.name, rest=self.rf)) time.sleep(self.rf) self.sis.join() logging.info('Finished execution of {name}'.format(name=self.name)) if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Main program started!") t1 = RunThing('Thread 1', 10) t2 = RunThing('Thread 2', 10) t2.start() t1.start() t1.set_sis(t2) t2.set_sis(t1) logging.info("Main program ended!")
Este exemplo é um pouco forçado, mas isto acontece com frequência em programas que tem dependências temporais não triviais, e normalmente não é muito fácil de resolver. Quando o seu programa não termina, o problema geralmente é isto.
Até agora vivemos no conforto de não tocarmos em nenhuma variável compartilhada pelos threads, mesmo sendo esta uma das principais características deste modelo de computação.
O principal problema de compartilhar variáveis é que o valor de uma variável pode ser alterado por outro thread enquanto aquela variável é usada em uma operação por um thread!
Imagine o caso mais simples possível, no qual a é uma variável compartilhada por mais de
um thread.
a = a + 1
Suponha que o valor inicial de a seja 0, e que 10 threads vão executar esta operação mais ou
menos ao mesmo tempo. Inocentemente poderíamos esperar que o valor final de a fosse 10, e isto
pode até acontecer.
Por outro lado, também pode acontecer que todos os threads tenham lido o valor da variável
original ao mesmo tempo, incrementado 1, e salvo o valor 1 na variável a, o que será o seu
valor final.
Sem algum controle de acesso àquela variável, é impossível prever o valor final da variável em um sistema arbitrário.
O exemplo abaixo ilustra isto.
from concurrent.futures import ThreadPoolExecutor, wait import threading as td import logging import time class RunThing(): def __init__(self, count): self.count = count self.counter = 0 def update(self): for i in range(self.count): self.counter += 1 if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Starting threads!") nt = 100 run_thing = RunThing(100000) with ThreadPoolExecutor(max_workers=nt) as executor: futures = [executor.submit(run_thing.update) for t in range(nt)] wait(futures) logging.info("Computed value s {}.".format(run_thing.counter)) logging.info("Main program ended!")
No meu sistema, executando este programa 4 vezes, obtive os resultados 353517, 315917, 325583, e 340747. O programa não é determinístico.
Para sabermos o valor correto sem pensar, precisamos proteger o acesso à variável compartilhada
self.counter com um lock.
from concurrent.futures import ThreadPoolExecutor, wait import threading as td import logging import time class RunThing(): def __init__(self, count): self.count = count self.counter = 0 self._lock = td.Lock() def update(self): for i in range(self.count): with self._lock: self.counter += 1 if __name__ == "__main__": format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") logging.info("Starting threads!") nt = 100 run_thing = RunThing(100000) with ThreadPoolExecutor(max_workers=nt) as executor: futures = [executor.submit(run_thing.update) for t in range(nt)] wait(futures) logging.info("Computed value s {}.".format(run_thing.counter)) logging.info("Main program ended!")
O programa passou a ser determinístico, mas agora obviamente sequencializamos o acesso a variável compartilhada. Se este trecho de código fosse importante, o desempenho do programa seria grandemente prejudicado. Claramente estamos falando não da implementação do Python, onde threads não aumentam a performance do código em programas de alta intensidade computacional.
Há outros objetos interessantes no módulo threading, para desenvolver programas paralelos mais
complexos usando threads, vale a pena ler a documentação do módulo threading, mas o que vimos
até agora é o suficiente para o nosso uso, lembrando que estamos estudando threads muito mais pelo
seu valor educacional.
Vamos ver um exemplo que usa threads para resolver um problema de "engenharia". Vamos lembrar pela milésima vez que não podemos esperar ganho de desempenho desta aplicação com a implementação padrão do Python. Vamos estudar a solução de um problema de Poisson em uma dimensão com o método das diferenças finitas.
Documentação do módulo threading na biblioteca padrão.
Documentação do módulo concurrent.futures na biblioteca padrão.
Excelente tutorial sobre threads e base desta aula.