/ramiro/demec/ufpe

Threading

Introdução

Threading acontece quando várias linhas de execução de um programa ocorrem simultaneamente, e todas acessam o mesmo espaço de memória.

Isto corresponde muito diretamente ao modelo de computação paralela com memória compartilhada, e é um modelo de programação muito empregado em aplicativos comerciais e sistemas operacionais para aproveitar, por exemplo, os múltiplos núcleos de um processador moderno.

Usar threads implica, no entanto, em uma programação de baixo nível, no sentido de que o programador é responsável por muitas coisas que poderiam ser feitas automaticamente pela própria linguagem ou pelo sistema operacional.

Tipicamente, o controle de acesso às variáveis compartilhadas é feito pelo próprio programador explicitamente, e isto pode ser bastante complicado. Em programação científica, a programação direta com threads é usada muito raramente. Em geral, usamos um sistema de mais alto nível, que tem primitivas mais relacionadas com as atividades de programação científica, conhecido como OpenMP.

GIL

Infelizmente, na implementação padrão da linguagem Python, existe o odiado GIL, de Global Interpreter Lock. Devido a dificuldades de implementação do interpretador da linguagem Python, motivos históricos e razões técnicas, o acesso às variáveis internas do interpretador Python é bloqueado para todos os threads menos um a cada instante de tempo.

O accesso às variáveis é serializado e isto significa que apenas um thread executa a cada instante de tempo, mesmo que, aparentemente, todos estejam sendo executados. Isto siginifica também que, mesmo que você tenha quatro threads e quatro núcleos disponíveis no processador, o seu programa não deve executar mais rapidamente, a menos de condições extremamente especiais.

Por que existem threads em Python então? Algumas vezes, um thread pode estar "executando" sem gastar CPU, como por exemplo quando está esperando por algum entrada via rede ou sistema de arquivos, ou esperando um dispositivo tornar-se disponível, etc. A programação com threads torna este tipo de programa mais simples e possivelmente mais eficiente.

Pré-requisitos

Threads são integrados na biblioteca padrão do Python (de uma versão relativamente recente), portanto você pode usá-los simplesmente importanto o módulo como mostrado a seguir.

import threading

A partir daí, você pode acessar as classes e funções deste módulo nos seus programas.

Logging

Não é estritamente necessário, mas é útil usar o módulo logging, que permite que você escreva mensagens com data e hora, automaticamente, e redirecionar para vários destinos. Vamos ver um exemplo aqui.

import logging
import time

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    time.sleep(5)
    logging.info("Main program ended!")

Criação de threads

O principal objeto para programação com threads em Python é a classe Thread.

Um objeto desta classe corresponde a um fluxo de execução, e você pode associar código a este objeto de duas maneiras diferentes, durante a construção do objeto ou sobrecarrando o método .run do objeto. Vamos ver exemplos destas duas formas. Um thread começa a executar quando o seu método .start é chamado.

Vamos ver um exemplo simples com .start

import threading as td
import logging
import time

def thread_prog(name, rest_for):
    logging.info('Started thread {name}, will rest for {rest}s'.\
                 format(name=name, rest=rest_for))
    time.sleep(rest_for)
    logging.info('Finished execution of {name}'.format(name=name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    t1 = td.Thread(target=thread_prog, args=('Thread 1', 5))
    t1.start()
    logging.info("Main program ended!")

E aqui a mesma coisa com um thread como um novo objeto.

import threading as td
import logging
import time

class RunThing(td.Thread):
    def __init__(self, name, rest_for):
        super().__init__()
        self.name = name
        self.rf = rest_for

    def run(self):
        logging.info('Started thread {name}, will rest for {rest}s'.\
                     format(name=self.name, rest=self.rf))
        time.sleep(self.rf)
        logging.info('Finished execution of {name}'.format(name=self.name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    t1 = RunThing('Thread 1', 5)
    t1.start()
    logging.info("Main program ended!")

Múltiplos threads

Vamos ver o que acontece quando colocamos mais de uma thread executando simultaneamente.

class RunThing(td.Thread):
    def __init__(self, name, rest_for):
        super().__init__()
        self.name = name
        self.rf = rest_for

    def run(self):
        logging.info('Started thread {name}, will rest for {rest}s'.\
                     format(name=self.name, rest=self.rf))
        time.sleep(self.rf)
        logging.info('Finished execution of {name}'.format(name=self.name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    t1 = RunThing('Thread 1', 5)
    t2 = RunThing('Thread 2', 10)
    t3 = RunThing('Thread 3', 15)
    t3.start()
    t2.start()
    t1.start()
    logging.info("Main program ended!")

Término

Vocês devem ter percebido que o programa principal termina antes dos threads, o que é bem estranho. Nós podemos, no entanto, suspender a execução do programa enquanto ele aguarda algum thread terminar.

No Python existem daemon threads, que são encerradas automaticamente quando progama termina. Rode o programa anterior novamente, com esta nova definição para a classe.

def __init__(self, name, rest_for):
        super().__init__(daemon=True)
        self.name = name
        self.rf = rest_for

Normalmente, não é isto que você deseja, então existem maneiras de esperar que threads terminem antes de continuar a execução, usando o método '.join', como no exemplo abaixo.

class RunThing(td.Thread):
    def __init__(self, name, rest_for):
        super().__init__(daemon=True)
        self.name = name
        self.rf = rest_for

    def run(self):
        logging.info('Started thread {name}, will rest for {rest}s'.\
                     format(name=self.name, rest=self.rf))
        time.sleep(self.rf)
        logging.info('Finished execution of {name}'.format(name=self.name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    t1 = RunThing('Thread 1', 5)
    t2 = RunThing('Thread 2', 10)
    t3 = RunThing('Thread 3', 15)
    t3.start()
    t2.start()
    t1.start()
    t3.join()
    logging.info('Detected end of {}'.format(t3.name))
    t2.join()
    logging.info('Detected end of {}'.format(t2.name))
    t1.join()
    logging.info('Detected end of {}'.format(t1.name))
    logging.info("Main program ended!")

Perceba que isto impõe uma ordem na detecção do final de cada thread e isto pode ter sérias consequências como veremos adiante.

Listas de Threads

Claramente enumerar explicitamente as threads de um programa é muito restritivo, mas, como threads são objetos normais do Python, podemos guardá-los em qualquer estrutura de dados da linguagem, como listas, dicionários, tuplas, etc., por exemplo,

import threading as td
import logging
import time
import random

class RunThing(td.Thread):
    def __init__(self, idx):
        super().__init__(daemon=True)
        self.name = 'Thread_{:02d}'.format(idx)
        self.rest_for = random.randint(1, 10)

    def run(self):
        logging.info('Started thread {name}, will rest for {rest}s'.\
                     format(name=self.name, rest=self.rest_for))
        time.sleep(self.rest_for)
        logging.info('Finished execution of {name}'.format(name=self.name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Creating threads!")
    threads = [ RunThing(idx) for idx in range(10)]
    logging.info("Starting threads!")
    for t in threads:
        t.start()
    logging.info("Waiting for termination threads!")
    for t in threads:
        t.join()
    logging.info("Main program ended!")

Novamente, perceba que o trecho

    for t in threads:
        t.join()

implica que você está esperando os threads terminarem, sequencialmente, na mesma ordem em que você chamou os métodos .join.

Em muitos casos, não é isto o que você quer, você deseja normalmente ser notificado imediatamente que algum thread terminou para tomar alguma atitude, como por exemplo iniciar um novo thread com novos dados. Para fazer isto precisamos introduzir outras coisas.

ThreadPoolExecutor

Para gerenciar threads de maneira mais sofisticada, passamos a não criá-las diretamente, e usamos objetos especiais da biblioteca do Python, que estão no módulo concurrent.futures.

Estes objetos são Executors, que gerenciam entidades que rodam em paralelo, e futures, que representam uma computação que não retorna imediatamente, e cujo resultado vai ser fornecido por ela em algum tempo no futuro. Existem Executors que gerenciam threads e que gerenciam processos, vamos nos preocupar, por enquanto, apenas com aqueles que gerenciam threads.

Futures

Um Executor roda uma coisa que pode ser chamada, como uma função, assincronamente, e retorna um future. Este objeto pode ser usado para esperar a execução do future terminar, cancelar a execução, capturar o valor de retorno da função, etc.

Vamos ver alguns exemplos simples da funcionalidade destes objetos.

No trecho abaixo, estamos chamando uma função que apenas eleva alguma coisa ao quadrado. Note o uso do gerenciador de contexto.

from concurrent.futures import ThreadPoolExecutor
import logging
import time

def run_thing(rest_for, data):
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        f = executor.submit(run_thing, 10, 4)
        r = f.result()
        logging.info('run_thing returned: {}'.format(r))
    logging.info("Main program ended!")

Observe que não há razão prática alguma para escrever o programa assim, neste caso poderíamos chamar a função diretamente e não haveria nenhuma diferença. No entanto temos muita flexibilidade agora, podemos, por exemplo, verificar se a chamada terminou, desistir da chamada após um certo tempo e muitas outras coisas. Também supostamente é possível cancelar um thread se você tiver muita sorte, mas parece que isto é, em geral, uma péssima ideia.

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import logging
import time

def run_thing(rest_for, data):
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        f = executor.submit(run_thing, 15, 4)
        try:
            r = f.result(timeout=5)
        except TimeoutError:
            logging.info('Time out!')
        if f.running():
            logging.info('Still running!')
        if f.done():
            logging.info('Finished execution!')
        else:
            logging.info('Not done yet!')
        r = f.result()
        logging.info("Result of call is: {}".format(r))

    logging.info("Main program ended!")

Como futures são assíncronos, podemos ter mais de um em execução simultaneamente. from concurrent.futures import ThreadPoolExecutor

import logging
import random
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        fs = [ executor.submit(run_thing, random.randint(1,10), 4*i) for i in range(1,6) ]
        logging.info("Waiting for thread termination.")
        res = [ f.result() for f in fs ]
        logging.info("Results of calls are: {}.".format(res))

    logging.info("Main program ended!")

Em termos de evitar uma sequencialização do término, não ganhamos nada com isto, pois a linha

        res = [ f.result() for f in fs ]

claramente implica na espera sequencial pelo término dos threads, na mesma ordem em que eles foram criados.

Este programa pode ser reescrito de forma muito mais elegante com o método .map de um executor.

from concurrent.futures import ThreadPoolExecutor
import numpy as np
import logging
import random
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        nt = 5
        rests = np.random.randint(1,10, nt)
        vals = [ 4*(i+1) for i in range(nt) ]
        logging.info("Waiting for thread resuls.")
        res = executor.map(run_thing, rests, vals)
        logging.info("Results of calls are: {}.".format(list(res)))

    logging.info("Main program ended!")

Quebrando a sequencialidade

Existem duas funções do módulo concurrent.futures que pode ser usadas para tratar os threads tão logo terminem, as_completed e wait.

A função as_completed retorna um iterador sobre os futures que vai fornecendo um future à medida em que eles vão completando.

from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import numpy as np
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ]
        logging.info("Waiting for thread termination.")
        for f in as_completed(fs):
            logging.info("Got result {}.".format(f.result()))

    logging.info("Main program ended!")

A função wait é mais complexa. Em primeiro lugar, você tem que dizer se quer retornar quando todos os threads terminem, quando o primeiro thread termine ou quando a primeira exceção aconteça, com os flags ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION.

Esta função retorna dois conjuntos em uma tupla nomeada, o conjunto dos futures que terminaram e o conjunto daqueles que ainda não terminaram. É mais fácil ver isto em um exemplo.

from concurrent.futures import ThreadPoolExecutor, wait
import logging
import numpy as np
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ]
        logging.info("Waiting for thread termination.")
        all_fs = wait(fs)
        logging.info("Completed threads:")
        for f in all_fs.done:
            logging.info("Got result {}.".format(f.result()))
        for f in all_fs.not_done:
            logging.info("Should not happen!")

    logging.info("Main program ended!")

Como vimos acima, você pode esperar por uma de cada vez, também. Observe que no programa acima usamos o valor default para return_when.

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import logging
import numpy as np
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    with ThreadPoolExecutor() as executor:
        fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,10) ]
        logging.info("Waiting for thread termination.")
        while True:
            ffs = wait(fs, return_when=FIRST_COMPLETED)
            if ffs.done: 
                for f in ffs.done:
                    logging.info("Got result {}.".format(f.result()))
            else: break
            fs = ffs.not_done

    logging.info("Main program ended!")

O programa acima não tem aparentemente muitas vantagens em relação a usar as_completed, mas agora podemos ir adicionando novas threads quando detectamos o término de uma.

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import logging
import numpy as np
import time

def run_thing(rest_for, data):
    logging.info("Waiting for {} seconds.".format(rest_for))
    time.sleep(rest_for)
    return data*data

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    max_threads=30
    start_threads = 10
    run_threads = start_threads
    with ThreadPoolExecutor() as executor:
        fs = [ executor.submit(run_thing, t, t) for t in np.random.randint(1,20,start_threads) ]
        logging.info("Waiting for thread termination.")
        while True:
            ffs = wait(fs, return_when=FIRST_COMPLETED)
            if ffs.done: 
                newfs = list()
                for f in ffs.done:
                    logging.info("Got result {}.".format(f.result()))
                    t = np.random.randint(1,20,1)[0]
                    if run_threads < max_threads:
                        newfs.append(executor.submit(run_thing, t, t))
                        run_threads += 1
            else: break
            fs = list(ffs.not_done)+newfs

    logging.info("Main program ended!")

O esquema acima é bem genérico e flexível, e permite que mantenhamos o número de threads em execução mais ou menos constante, como em um típico esquema "bag of tasks".

Problemas

Um dos motivos para estudarmos threads, mesmo sem nenhum aumento de desempenho, é que podemos ver alguns dos problemas que podem ocorrer em computação paralela em um contexto muito simples.

Deadlock

Um programa está em deadlock quando todos os threads estão parados, esperando por alguma coisa que não pode acontecer. Isto normamente ocorre por causa de alguma dependência cruzada.

Execute o programa a seguir e perceba que os threads nunca terminam, já que o primeiro espera o segundo terminar, e o segundo espera o primeiro terminar.

import threading as td
import logging
import time

class RunThing(td.Thread):
    def __init__(self, name, rest_for):
        super().__init__()
        self.name = name
        self.rf = rest_for
        self.sis = None

    def set_sis(self, sis):
        self.sis = sis

    def run(self):
        logging.info('Started thread {name}, will rest for {rest}s'.\
                     format(name=self.name, rest=self.rf))
        time.sleep(self.rf)
        self.sis.join()
        logging.info('Finished execution of {name}'.format(name=self.name))

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Main program started!")
    t1 = RunThing('Thread 1', 10)
    t2 = RunThing('Thread 2', 10)
    t2.start()
    t1.start()
    t1.set_sis(t2)
    t2.set_sis(t1)
    logging.info("Main program ended!")

Este exemplo é um pouco forçado, mas isto acontece com frequência em programas que tem dependências temporais não triviais, e normalmente não é muito fácil de resolver. Quando o seu programa não termina, o problema geralmente é isto.

Race conditions

Até agora vivemos no conforto de não tocarmos em nenhuma variável compartilhada pelos threads, mesmo sendo esta uma das principais características deste modelo de computação.

O principal problema de compartilhar variáveis é que o valor de uma variável pode ser alterado por outro thread enquanto aquela variável é usada em uma operação por um thread!

Imagine o caso mais simples possível, no qual a é uma variável compartilhada por mais de um thread.

a = a + 1

Suponha que o valor inicial de a seja 0, e que 10 threads vão executar esta operação mais ou menos ao mesmo tempo. Inocentemente poderíamos esperar que o valor final de a fosse 10, e isto pode até acontecer.

Por outro lado, também pode acontecer que todos os threads tenham lido o valor da variável original ao mesmo tempo, incrementado 1, e salvo o valor 1 na variável a, o que será o seu valor final.

Sem algum controle de acesso àquela variável, é impossível prever o valor final da variável em um sistema arbitrário.

O exemplo abaixo ilustra isto.

from   concurrent.futures import ThreadPoolExecutor, wait
import threading as td
import logging
import time

class RunThing():
    def __init__(self, count):
        self.count = count
        self.counter = 0

    def update(self):
        for i in range(self.count):
            self.counter += 1

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Starting threads!")
    nt = 100
    run_thing = RunThing(100000)
    with ThreadPoolExecutor(max_workers=nt) as executor:
        futures = [executor.submit(run_thing.update) for t in range(nt)]
        wait(futures)

    logging.info("Computed value s {}.".format(run_thing.counter))
    logging.info("Main program ended!")

No meu sistema, executando este programa 4 vezes, obtive os resultados 353517, 315917, 325583, e 340747. O programa não é determinístico.

Para sabermos o valor correto sem pensar, precisamos proteger o acesso à variável compartilhada self.counter com um lock.

from   concurrent.futures import ThreadPoolExecutor, wait
import threading as td
import logging
import time

class RunThing():
    def __init__(self, count):
        self.count = count
        self.counter = 0
        self._lock = td.Lock()

    def update(self):
        for i in range(self.count):
            with self._lock:
                self.counter += 1

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    logging.info("Starting threads!")
    nt = 100
    run_thing = RunThing(100000)
    with ThreadPoolExecutor(max_workers=nt) as executor:
        futures = [executor.submit(run_thing.update) for t in range(nt)]
        wait(futures)

    logging.info("Computed value s {}.".format(run_thing.counter))
    logging.info("Main program ended!")

O programa passou a ser determinístico, mas agora obviamente sequencializamos o acesso a variável compartilhada. Se este trecho de código fosse importante, o desempenho do programa seria grandemente prejudicado. Claramente estamos falando não da implementação do Python, onde threads não aumentam a performance do código em programas de alta intensidade computacional.

Há outros objetos interessantes no módulo threading, para desenvolver programas paralelos mais complexos usando threads, vale a pena ler a documentação do módulo threading, mas o que vimos até agora é o suficiente para o nosso uso, lembrando que estamos estudando threads muito mais pelo seu valor educacional.

Exemplo

Vamos ver um exemplo que usa threads para resolver um problema de "engenharia". Vamos lembrar pela milésima vez que não podemos esperar ganho de desempenho desta aplicação com a implementação padrão do Python. Vamos estudar a solução de um problema de Poisson em uma dimensão com o método das diferenças finitas.

Referências

  1. Documentação do módulo threading na biblioteca padrão.

  2. Documentação do módulo concurrent.futures na biblioteca padrão.

  3. Excelente tutorial sobre threads e base desta aula.