AsyncIO – O futuro do Python mudou completamente!

Tradução do artigo original escrito por Yeray Diaz para o hackernoon: AsyncIO for the working Python developer

Eu me lembro do exato momento em que eu pensei, “Uau, isso está lento, aposto
que se eu pudesse paralelizar essas chamadas isso voaria!” e então, 3 dias após,
eu olhei para o meu código e não pude reconhece-lo, havia se transformado em
um misturado de chamadas para threading e funções da biblioteca processing.

Então encontrei o asyncio,
e tudo mudou!


Se você não conhece, asyncio é o novo módulo de concorrência introduzido no
Python 3.4. É projetado para usar coroutines e futures para simplificar a
programação assíncrona e tornar o código tão legível quanto o código síncrono
simplesmente por não haver callbacks.

Eu também me lembro que enquanto eu estava naquela busca pelo paralelismo
inúmeras opções estavam disponíveis, mas uma se destacou. Era rápida, fácil de
aprender e muito bem escrita: A excelente biblioteca
gevent. Eu cheguei até ela lendo o encantador
tutorial mão na massa: Gevent for the Working Python Developer,
escrito por uma sensacional comunidade de usuários, uma ótima introdução não
apenas ao gevent mas ao conceito de concorrência em geral, e você também
deveria dar uma lida.

Eu gostei tanto do tutorial que decidi usa-lo como template para escrever
sobre o AsyncIO.

Aviso Rápido: Isto não é um artigo sobre gevent X asyncio, O Nathan Road
escreveu a respeito
das diferenças e similaridades entre esses 2 se você estiver interessado.

Uma nota a respeito do código neste tutorial, você deve ter lido que no Python 3.5
uma nova sintaxe foi introduzida,
especialmente para coroutines, eu estou intencionalmente não utilizando esta
nova sintaxe neste texto pois desta forma acredito que fica mais fácil para
assimilar as coroutinas com generators. Mas você pode encontrar versões dos
exemplos usando esta nova sintaxe no github.

Eu sei que você já deve estar ansioso mas antes de mergulharmos eu gostaria de
primeiramente falar rapidamente sobre alguns conceitos que talvez não lhe sejam
familiares.

Threads, loops, coroutines and futures

Threads são uma ferramenta comum e a maioria dos desenvolvedores já ouviu falar
ou já usou. Entretanto o asyncio usa de estruturas um pouco diferentes:
event loops, coroutines e futures.

  • Um event loop gerencia e distribui a execução de diferentes tarefas. Ele
    mantém um registro de tarefas (coroutines) e distribui o fluxo de execução entre elas.
  • As coroutines são geradores Python (generators), que quando há a ocorrência
    do yield libera o controle do fluxo de volta ao event loop.
    Uma coroutine precisa estar programada para ser executada usando
    o event loop, para fazer isso criamos uma tarefa do tipo future.
  • E um future é um objeto que representa o resultado de uma tarefa que pode,
    ou não, ter sido executada. Este resultado pode ser uma Exception.

Entendeu? simples né? vamos mergulhar neste conceito!

Execução síncrona e Execução assíncrona

Em Concorrência não é paralelismo, é melhor!
o Rob Pike falou uma coisa que fez um click na minha cabeça:
Dividir tarefas em sub-tarefas concorrentes já é o suficiente para permitir
o paralelismo. Mas é o fato de programar/agendar a execução dessas
sub-tarefas que realmente cria o paralelismo.

O ASyncIO faz exatamente isso, você pode estruturar o seu código em
sub-tarefas definidas como coroutines e isso te permite programar a execução
da maneira que desejar, incluindo a forma simultânea. As Corountines
contém pontos de vazão demarcados com a palavra yield onde definimos
onde uma troca de contexto poderia ocorrer caso existam
outras tarefas pendentes, mas que não irá ocorrer caso não existam
outras tarefas.

Nota do tradutor: Em uma loja de doces há um funcionário empacotando balas,
ao finalizar cada pacote ele o lacra e coloca na vitrine (YIELD),
então ele dá uma olhada no balcão para ver se tem algum cliente para ser
atendido, se tiver um cliente, então ele para de empacotar balas
atende o pedido do cliente (troca de contexto). E só depois de terminar de > atender o cliente ele então volta a empacotar as balas, caso não tenha cliente
a ser atendido ele simplesmente continua o trabalho de empacotamento.
Podemos dizer que é um funcionário fazendo duas tarefas __. (responda
nos comentários se é paralelamente ou concorrentemente)

Uma troca de contexto no asyncio representa o event loop passando o fluxo
de controle da coroutine em execução para a próxima na fila de execução,
ou seja, (Yielding).

Veja um exemplo básico:

import asyncio

@asyncio.coroutine
def empacotar_bala():
    print("Empacotando balas...")

    # parada para verificar se tem cliente no balcão
    yield from asyncio.sleep(0)

    # troca de contexto
    print("Explicitamente voltando a empacotar balas")


@asyncio.coroutine
def atender_balcao():
    print("Explicitamente verificando se tem cliente no balcão...")

    yield from asyncio.sleep(0)

    print("Voltando a empacotar as balas")


ioloop = asyncio.get_event_loop()  # Event Loop

tasks = [ioloop.create_task(empacotar_bala()),
         ioloop.create_task(atender_balcao())]

wait_tasks = asyncio.wait(tasks)

ioloop.run_until_complete(wait_tasks)

ioloop.close()

Execute:

$ python3 async1.py
Empacotando balas...
Explicitamente verificando se tem cliente no balcão...
Explicitamente voltando a empacotar balas
Voltando a empacotar as balas
  • Primeiramente nós declaramos duas tarefas simples com a intenção de serem
    executadas de maneira não bloqueante pois usamos a função sleep do asyncio.
  • Coroutines só podem ser chamadas por outras coroutines ou podem ser
    agrupadas em uma task para então serem enfileiradas, nós usamos a função
    create_task para fazer isso.
  • Então criamos lista contendo as 2 tasks e nós a combinamos em uma
    wait que é uma task que irá aguardar até que todas as tarefas
    enfileiradas terminem.
  • E finalmente nós programamos a wait para executar usando o event loop
    usando a função run_until_complete.

Ao usar yield from na coroutine empacotar_bala nós declaramos que a
coroutine pode naquele momento passar o controle do fluxo de execução de volta
para o event loop, neste caso o sleep ao terminar (note o sleep(0))
irá devolver o controle ao event loop que irá mudar de contexto,
passando o controle de fluxo para a próxima coroutine agendada para execução:
atender_balcao

Nota: O tradutor alterou os nomes das funções dos exemplos do artigo original
para dar um significado mais fácil de ser interpretado em português mas mantendo
a semântica e fluxo de execução dos códigos, todavia os originais estão no
github.

Vamos agora simular duas tarefas bloqueantes gr1 e gr2, considere que
há dois requests para serviços externos. Enquanto elas executam, uma terceira
tarefa pode ser executada assíncronamente, como no seguinte exemplo:

import time
import asyncio

start = time.time()

def tic():
  return 'at %1.1f segundos' % (time.time() - start)


@asyncio.coroutine
def gr1():
  # Demora a ser executada, mas não queremos esperar
  print('gr1 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr1 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr2():
  # Demora a ser executada, mas não queremos esperar
  print('gr2 iniciou a execução: {}'.format(tic()))
  yield from asyncio.sleep(2)
  print('gr2 terminou a execução: {}'.format(tic()))


@asyncio.coroutine
def gr3():
  print('Executando enquanto as outras estão bloqueadas: {}'.format(tic()))
  yield from asyncio.sleep(5)
  print('Pronto!')

ioloop = asyncio.get_event_loop()
tasks = [
    ioloop.create_task(gr1()),
    ioloop.create_task(gr2()),
    ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()

Execute:

$ python3 async2.py 
gr1 iniciou a execução: at 0.0 segundos
gr2 iniciou a execução: at 0.0 segundos
Executando enquanto as outras estão bloqueadas: at 0.0 segundos
gr1 terminou a execução: at 2.0 segundos
gr2 terminou a execução: at 2.0 segundos
Pronto!

Perceba que na forma que o I/O loop faz o gerenciamento e programa a execução permite
que o seu código, rodando em single thread possa operar de forma concorrente.
Enquanto duas tarefas estavam bloqueadas uma terceira pode tomar o controle
do fluxo de execução e ser executada de maneira assíncrona.

Ordem de execução

No mundo síncrono estamos acostumados a pensar de maneira linear. Se nós
tivermos uma lista de tarefas que consomem diferente quantidade de tempo
elas serão executadas na ordem em que foram chamadas.

Porém, quando usamos concorrência nós precisamos estar cientes de que as
tarefas terminam em tempos que diferem da ordem em que foram enfileiradas.

import random
from time import sleep
import asyncio


def task(pid):
    """Uma tarefa não deterministica"""
    sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


@asyncio.coroutine
def task_coro(pid):
    """Uma tarefa deterministica"""
    yield from asyncio.sleep(random.randint(0, 2) * 0.001)
    print('Task %s terminada' % pid)


def synchronous():
    for i in range(1, 10):
        task(i)


@asyncio.coroutine
def asynchronous():
    tasks = [asyncio.async(task_coro(i)) for i in range(1, 10)]
    yield from asyncio.wait(tasks)


print('Síncronamente:')
synchronous()

ioloop = asyncio.get_event_loop()
print('Assíncronamente:')
ioloop.run_until_complete(asynchronous())

ioloop.close()

Execute:

$ python3 async3.py 
Síncronamente:
Task 1 terminada
Task 2 terminada
Task 3 terminada
Task 4 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 8 terminada
Task 9 terminada

Assíncronamente:
Task 2 terminada
Task 4 terminada
Task 8 terminada
Task 5 terminada
Task 6 terminada
Task 7 terminada
Task 9 terminada
Task 1 terminada
Task 3 terminada

A saida será com certeza variada, pois cada task espera por uma quantidade
randômica de tempo, mas repare que a ordem dos resultados é completamente
diferente, mesmo tendo enfileirado em uma lista de tarefas na mesma ordem
usando o mesmo range.

Outro detalhe é que tivemos que uma versão em coroutine da nossa simples
função. É importante entender que o asyncio não faz com que as coisas se
transformarem magicamente em não bloqueantes.

O AsyncIO está por enquanto sozinho na biblioteca padrão do Python 3 enquanto
todos outros módulos oferecem apenas funcionalidades bloqueantes.

Você pode usar o módulo concurrent.futures para agrupar tarefas
bloqueantes em uma thread ou um processo e então retornar um Future que
o asyncio pode utilizar. Os mesmos exemplos utilizando threads podem ser
encontrados no github

Esta é provavelmente a maior desvantagem ao usar asyncio neste momento,
porém existe uma série de bibliotecas para diferentes tarefas e serviços que
já estão disponíveis de maneira não bloqueante.


Uma tarefa bloqueante bastante comum é coletar dados de um serviço HTTP.
Para isso vou usar a excelente biblioteca aiohttp que efetua chamadas não bloqueantes a serviços HTTP.
Neste exemplo vamos coletar dados da API pública do Github e ler apenas o valor
de Date do responde header.

import time
import urllib.request
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


def fetch_sync(pid):
    print('Captura síncrona {} iniciou'.format(pid))
    start = time.time()
    response = urllib.request.urlopen(URL)
    datetime = response.getheader('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    return datetime


@asyncio.coroutine
def fetch_async(pid):
    print('Captura assíncrona {} iniciou'.format(pid))
    start = time.time()
    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    print('Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start))

    response.close()
    return datetime


def synchronous():
    start = time.time()
    for i in range(1, MAX_CLIENTS + 1):
        fetch_sync(i)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


@asyncio.coroutine
def asynchronous():
    start = time.time()
    tasks = [asyncio.ensure_future(
        fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
    yield from asyncio.wait(tasks)
    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


print('Sincrono:')
synchronous()

print('Assíncrono:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 -V
Python 3.4.4+

$ pip3 install aiohttp

$ python3 async4.py
Sincrono:
Processo sincrono 1 iniciou
Processo 1: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.54 segundos
Processo sincrono 2 iniciou
Processo 2: Wed, 17 Feb 2016 13:10:11 GMT, demorou: 0.50 segundos
Processo sincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.48 segundos
Process demorou: 1.54 segundos

Assíncrono:
Processo assincrono 1 iniciou
Processo assincrono 2 iniciou
Processo assincrono 3 iniciou
Processo 3: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.50 segundos
Processo 2: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.52 segundos
Processo 1: Wed, 17 Feb 2016 13:10:12 GMT, demorou: 0.54 segundos
Processo demorou: 0.54 segundos

Nota: requer Python 3.4.4+ caso contrário cairá em exception

Primeiramente, repare na diferença de tempo, usando chamadas assíncronas
nós efetuamos as requisições ao serviço HTTP exatamente ao mesmo tempo (13:10:12).
Como falado anteriormente, cada requisição passou (yield) o fluxo de controle
para a próxima e retornou quando foi completada.

Resultando no fato de que requisitar e capturar os resultados de todos as
tarefas demorou o mesmo tempo que a requisição mais lenta! Veja o tempo no log
0.54 segundos para a requisição mais lenta (processo 1) e é exatamente o mesmo
tempo que se passou para processar todos os 3 requests, Legal né? (enquanto a
parte de I/O daquela tarefa lenta estava bloqueada, as outras puderam ser
executadas simultaneamente).

Agora veja como o código é similar ao da versão síncrona! é praticamente o mesmo
código! As diferenças principais são por conta das diferenças de implementações
das bibliotecas usadas e a parte da criação das tasks e a espera para elas
terminarem.

Criando concorrência

Até então estamos usando uma única abordagem de criar e requisitar resultados de
coroutines, criar uma lista de tasks e esperar que elas terminem.

Mas as coroutines podem ser programadas para serem executadas e requisitar
resultados em maneiras diferentes. Imagine um cenário onde precisamos processar
os resultados de uma chamada HTTP GET assim que ela é requisitada, o processo
é na verdade similar ao que fizemos no exemplo anterior.

import time
import random
import asyncio
import aiohttp

URL = 'https://api.github.com/events'
MAX_CLIENTS = 3


@asyncio.coroutine
def fetch_async(pid):
    start = time.time()
    sleepy_time = random.randint(2, 5)
    print('Processo assincrono {} iniciou, esperando por {} segundos'.format(
        pid, sleepy_time))

    yield from asyncio.sleep(sleepy_time)

    response = yield from aiohttp.request('GET', URL)
    datetime = response.headers.get('Date')

    response.close()
    return 'Processo {}: {}, demorou: {:.2f} segundos'.format(
        pid, datetime, time.time() - start)


@asyncio.coroutine
def asynchronous():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = yield from future
        print('{} {}'.format(">>" * (i + 1), result))

    print("Processo demorou: {:.2f} segundos".format(time.time() - start))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute

$ python3 async5.py

Processo assincrono 1 iniciou, esperando por 4 segundos
Processo assincrono 3 iniciou, esperando por 5 segundos
Processo assincrono 2 iniciou, esperando por 3 segundos
>> Processo 2: Wed, 17 Feb 2016 13:55:19 GMT, demorou: 3.53 segundos
>>>> Processo 1: Wed, 17 Feb 2016 13:55:20 GMT, demorou: 4.49 segundos
>>>>>> Processo 3: Wed, 17 Feb 2016 13:55:21 GMT, demorou: 5.48 segundos
Processo demorou: 5.48 segundos

Repare no deslocamento >> e no tempo de cada chamada, elas foram
programadas ao mesmo tempo, os resultados chegam fora de ordem e são
processados assim que chegam.

Este código é um pouco diferente, estamos agrupando as coroutines em uma lista,
cada uma para ser agendada e executada. a função as_completed retorna um
iterador que irá gerar (YIELD) um future completo assim que a tarefa
estiver terminada. Muito legal né? aliás, as funções as_completed e wait
são ambas originalmente parte do modulo concurrent.futures


Vamos pegar um outro exemplo, imagine que você está tentando consultar o seu
endereço de IP público atual em seu programa. Existem alguns serviços que fornecem
essa informação mas você não tem certeza se estarão acessíveis no momento da
execução. Você não quer checar cada um deles sequencialmente, então, é
preferível efetuar requisições concorrentes para cada um dos serviços e
utilizar aquele que responder mais rapidamente, ok? Ok!

Bom, acontece que nosso velho amigo wait recebe o parâmetro return_when
para fazer exatamente isso. Estávamos ignorando o dado retornado pelo wait
já que estávamos preocupados apenas em paralelizar as tarefas. Mas agora nós
queremos pegar os resultados da coroutine, então usaremos dois conjuntos de futures, done e pending.

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    response = yield from aiohttp.request('GET', service.url)
    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Execute:

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>

Espere, o que aconteceu aqui? O primeiro serviço respondeu com sucesso então
o que são esses warnings?

Bem, nós agendamos duas tarefas mas não permitimos que as duas fossem
completadas. o AsyncIO considera que é um bug e imprime um warning. Então
queremos informar ao event loop que este era um comportamento esperado
e não se preocupar com a segunda tarefa que ficou pendente. Como? que bom que
perguntou.

Estados do Futuro

(sobre o estado que um future está atualmente, não o estado que ele estará
no futuro… você entendeu né!)

Eles são:

  • Pending
  • Running
  • Done
  • Cancelled

Simples assim, quando um future finaliza sua tarefa, esta tarefa irá retornar
o resultado de volta para o future, se estiver em pending ou cancelled
ele gera o erro InvalidStateError ou CancelledError, e finalmente
se a coroutine gera o erro ele será re-gerado, o que significa o mesmo
comportamento ao chamar exception. confira aqui

Você também pode usar .done, .cancelled e .running em uma Future
para obter um booleano indicando o estado. Note que done significa simplesmente
que o result irá retornar ou gerar um erro. Você pode explicitamente cancelar
um Future chamando o método cancel, e isso é o que precisamos para
resolver o warning anterior.



def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, return_when=FIRST_COMPLETED)
    print(done.pop().result())

    for future in pending:
        future.cancel()

e então

$ python3 async6.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api terminou com resultado: 82.34.76.170, demorou: 0.09 segundos

Um bom resultado né!

Futures permitem injetar callbacks a serem executados quando entrarem
no estado done caso você queira adicionar uma lógica adicional, ou se você
não quiser usar o yield from e prefira o callback hell, (sério quer mesmo?)

E você pode, para objetivos de unit-testing manualmente injetar o resultado ou
uma exception a um Future.

Gerenciamento de erros

O AsyncIO é sobre fazer código concorrente gerenciável e legível, e isto
se torna óbio no que diz respeito ao tratamento de erros. Vamos voltar ao
exemplo anterior para ilustrar isso.

Imagine que queremos garantir que os 2 serviços retornaram o mesmo resultado,
mas um dos serviços fica offline e não responde, podemos usar apenas o usual
try… except

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        print(future.result())


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async7.py
Fetching IP from ip-api
Fetching IP from borken
Fetching IP from ipify
ip-api terminou com o resultado: 85.133.69.250, demorou: 0.75 segundos
ipify terminou com o resultado: 85.133.69.250, demorou: 1.37 segundos
borken não está respondendo

Também podemos tratar os erros enquanto processamos os resultados dos Futures
em caso de algo não esperado acontecer (lembra que eu disse que os erros são
re-gerados na coroutine).

from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
import traceback

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
    Service('broken', 'http://este-servico-nao-funciona', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return "{} não está respondendo".format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    return '{} terminou com resultado: {}, demorou: {:.2f} segundos'.format(
        service.name, ip, time.time() - start)


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = yield from asyncio.wait(futures)

    for future in done:
        try:
            print(future.result())
        except:
            print("Erro não esperado: {}".format(traceback.format_exc()))


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

Veja:

$ python3 async7.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
ipify terminou com o resultado: 85.133.69.250, demorou: 0.91 segundos
borken não está respondendo
Erro não esperado: Traceback (most recent call last):
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 41, in asynchronous
 print(future.result())
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py”, line 274, in result
 raise self._exception
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3b-fetch-ip-addresses-future-exceptions.py”, line 27, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Da mesma forma que agendar uma task e não esperar que ela termine é considerado
um bug, agendar uma task e não recuperar possíveis erros também irá gerar um
warning.

from collections import namedtuple
import time
import asyncio
import aiohttp

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
    Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        print('{} is unresponsive'.format(service.name))
    else:
        json_response = yield from response.json()
        ip = json_response[service.ip_attr]

        response.close()
        print('{} finished with result: {}, took: {:.2f} seconds'.format(
            service.name, ip, time.time() - start))


@asyncio.coroutine
def asynchronous():
    futures = [fetch_ip(service) for service in SERVICES]
    yield from asyncio.wait(futures)  # intentionally ignore results


ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()

execute:

$ python3 async8.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
borken is unresponsive
ipify finished with result: 85.133.69.250, took: 0.78 seconds
Task exception was never retrieved
future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError(‘this-is-not-an-attr’,)>
Traceback (most recent call last):
 File “/usr/local/Cellar/python3/3.5.0/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py”, line 239, in _step
 result = coro.send(value)
 File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 26, in fetch_ip
 ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’

Se parece muito com a saída do exemplo anterior, mas não contém os mínimos detalhes e mensagens do asyncio.

Timeouts

E se nós não nos importarmos muito com o nosso IP? Imagine que é apenas um
adicional ao nosso serviço, mas não importante, não queremos que o usuário
fique esperando por este dado. Idealmente nós definimos um time-out para nossas
tarefas não bloqueantes, e então continuamos nosso programa sem o atributo do IP
já que neste exemplo não é tão importante.

De novo descobrimos que wait tem o atributo que precisamos:

import time
import random
import asyncio
import aiohttp
import argparse
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED

Service = namedtuple('Service', ('name', 'url', 'ip_attr'))

SERVICES = (
    Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
    Service('ip-api', 'http://ip-api.com/json', 'query'),
)

DEFAULT_TIMEOUT = 0.01


@asyncio.coroutine
def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    yield from asyncio.sleep(random.randint(1, 3) * 0.1)
    try:
        response = yield from aiohttp.request('GET', service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    json_response = yield from response.json()
    ip = json_response[service.ip_attr]

    response.close()
    print('{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start))
    return ip


@asyncio.coroutine
def asynchronous(timeout):
    response = {
        "message": "Result from asynchronous.",
        "ip": "not available"
    }

    futures = [fetch_ip(service) for service in SERVICES]
    done, pending = yield from asyncio.wait(
        futures, timeout=timeout, return_when=FIRST_COMPLETED)

    for future in pending:
        future.cancel()

    for future in done:
        response["ip"] = future.result()

    print(response)


parser = argparse.ArgumentParser()
parser.add_argument(
    '-t', '--timeout',
    help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
    default=DEFAULT_TIMEOUT, type=float)
args = parser.parse_args()

print("Using a {} timeout".format(args.timeout))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous(args.timeout))
ioloop.close()

Repare no argumento timeout, também adicionamos um parâmetro de linha de comando para testar mais facilmente o que acontecerá se deixarmos a requisição
ocorrer algumas vezes. Também adicionei um tempo randomico de slepp só para garantir que as coisas não aconteçam tão rápido que a gente nem perceba.

$ python async8.py

Using a 0.01 timeout
Fetching IP from ipify
Fetching IP from ip-api
{‘message’: ‘Result from asynchronous.’, ‘ip’: ‘not available’}

$ python async8.py -t 5
Using a 5.0 timeout
Fetching IP from ip-api
Fetching IP from ipify
ipify finished with result: 82.34.76.170, took: 1.24 seconds
{'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}

Conclusão

Asyncio aumentou meu já enorme amor por Python. Para ser absolutamente honesto
eu me apaixonei pelas coroutines em Python quando conheci o Tornado mas o asyncio conseguiu unir o melhor desta abordagem junto com
excelentes bibliotecas de concorrência. E muito foi feito para que outras bibliotecas possam usar o IO loop, então se você está usando o Tornado, você também pode usa-lo com bibliotecas feitas para o asyncio!

E como eu disse anteriormente o maior problema por enquanto é a falta de bibliotecas e módulos que implementam o comportamento não bloqueante.
Você pode achar que uma vasta quantidade de tecnologias já estabelecidas ainda
não tenham uma versão não bloqueante para interagir com asyncio, ou que as existentes ainda estão jovens ou experimentais. Porém, O numero de bibliotexas está crescendo diariamente

confira este repo: https://github.com/aio-libs

Espero neste tutorial ter passado a idéia de como é prazeroso trabalhar com
AsyncIO. Eu honestamente penso que isto é a peça que finalmente nos levará
a adoção em massa e adaptação ao Python 3, essas são as coisas que você está
perdendo se ficar parado no Python 2.7.

Uma coisa é certa, o Futuro do Python mudou completamente! (trocadilho intencional)