Controlando requisições paralelas para N hosts

Tenho a seguinte situação pra debater a respeito, tenho uma lista de hoteis afiliados que disponibilizam uma API Rest para consulta de quartos disponíveis com base nas datas de entrada e saída do hóspede. Vamos pensar que esses endpoints são padronizados e devolvem a response no mesmo formato, sempre.

Tenho uma API que vai receber como parâmetro do usuário as datas de entrada e saída. Em seguida, preciso disparar de uma só vez as requisições para cada um dos endpoints dos afiliados em busca dos quartos disponíveis em cada um deles. Porém, tenho um requisito não funcional no meio do caminho: a requisição que durar mais que 1 minuto pra ter resposta é ignorada. Logo, as N requisições devem ser feitas de forma independentes, ou seja, se uma delas der erro,somente ela é ignorada. Se a requisição demorar mais do que 1 minuto, somente a que demorou é ignorada.

Eis que surge o problema, qual é a melhor forma de trabalhar com um requisito desses? Como definir que o rempo de response deve ser menor ou igual a um minuto? Como disparar N consultas ao mesmo tempo, capturar todas as respostas e montar uma response única pro usuário?

Bom, inicialmente pensei em trabalhar com tópicos JMS, uma vez que a requisição do usuário poderia gerar um Publisher enviando o broadcast pra todos os Receivers, no caso os afiliados. Dessa forma eu mato o problema de reuquisitar todos ao mesmo tempo, além de definir que as mensagens que não forem tratadas em 5 segundos são ignoradas. Mas é possível definir o tempo de processamento da resposta? Como eu pegaria o retorno de cada receiver e montaria um response único para o usuário, ou seja, uma lista contendo todos os quartos dos afiliados xpto. Essa é uma boa arquitetura para este problema?

Um outro ponto de vista me levou a pensar num processamento batch pra esta requisição, mas como nunca usei, sequer imagino se é viável. Porém, as nuances que li sobre o assunto implementado no JEE 7 me parece um pouco dentro do contexto, uma vez que posso ter um JobOperator fazendo o processamento particionado da requisição. Mas volto aos mesmos questionamentos: como tratar o tempo de resposta? eu consigo moldar o chunk para capturar os itens processados durante um checkpoint, por exemplo, e montar uma lista? E no final de tudo isso, como unificar o resultado dos N chunks?

Não espero uma resposta com solução pronta ou coisa do tipo, pois o intuito dessa pergunta é mais fazer um brainstorm e avaliar possíveis arquiteturas para a solução do problema. Quem sabe a partir daí o papo evolua pra dúvidas mais hands-on.

Sendo assim, peço que quem puder contribuir com ideias, mesmo que no final elas não façam tanto sentido, por favor, postem!

Comunidade tá mais parada do que água de sisterna. Bom, consegui elaborar uma abordagem que me atendeu, pelo menos inicialmente. A quem interessar, a mesma é baseada no padrão de integração request/reply utilizando JMS com uma implementação do ActiveMQ.

Fica o link de uma pequena POC que fiz envolvendo queues, topics, durable subscribers e por fim o cenário de request-reply.

Acesse aqui.

cara, vc pensou em usar ExecutorService executor = Executors.newFixedThreadPool(NUMERO DE SITES QUE TEM CONSULTAR) acho que ele resolve seu problema

Não posso amarrar a quantidade de sites. Tenho que levantar a bola pra quem quiser cortar, sacou? Nisso posso ter um host ou n hosts ativos que se interessem em atender a consulta.

Até o momento não econtrei algo mais adequado do que JMS.

vc nao vai arramar, vc vai ter em algum momento a lista de site q vc quer consultar ne, entao o tamanho da lista é quantidade de threads

1 curtida

Na verdade JMS que é fora de cogitação atualmente.

Verdade. Até onde entendi, nao tem nada além do que n threads consumindo API REST, com endereços cadastrados em uma tabela ou algum outro meio que ele deseje. Assim que todas as threads terminarem, monta uma única resposta.

Hmmm, saquei. Mas e com relação ao timeout? É possível dizer ao ExecutorService que ele tem um timeout de 1 minuto, por exemplo, e se o timeout for atingido a thread ser ignorada?

Deixei aberto no enunciado da questão que JMS é uma opção que vislumbrei, não que a discussão tenha que ser focada em JMS.

Até o presente momento a solução em JMS com pattern request-reply atendeu aqui, mas certamente estou em busca de algo que seja mais viável.

Isso no título pode ter desinteressado a maioria. Mas olhando hoje o conteúdo era interessante mesmo.

Realmente, faz sentido! Apesar do enunciado ser aberto, o título deixou específico. Alterado, valeu!

Bom, tirei alguma dúvidas na documentação da classe e me fez bastante sentido.
Primeiro deixo registrado o ponto que me tirou a dúvida quanto ao tratamento de timeout do pool, retirado da própria documentação:

void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

A minha dúvida neste momento está em como a consulta vai se comportar. Por exemplo, digamos que eu tenho 10 afiliados que devem ser executados. Beleza, consigo criar uma classe IS-A Runnable que recebe essa lista e monta o pool com o tamanho certo da lista dos host. Vamos afirmar que cada host segue um contrato que define o comportamento da consulta REST. Lá no meu método run(), como eu disparo todas as consultas de uma vez? Tenho que colocar um for na lista e dar um execute() no pool para cada consulta?

Mais um rascunho finalizado sobre como executar todas as tarefas de uma vez. O metodo invokeAll() me permite passar uma lista de tarefas e manusear o retorno num Future<>.

Se puderem opiniar se é realmene este o caminho, agradeço.

qual o problema com o JMS exatamente ? até entendo que a especificação JMS em si possa estar defasada, mas brokers de mensagens estão sendo muito usados atualmente, principalmente com essa onda de microserviços.

Cara, não é assim que se usa o ExecutorService. Você tem que pensar nele como se fosse um pool de conexões JDBC. Por exemplo, em um server JEE você não cria e destrói pools de conexão. O pool é criado de destruído pelo server. Como o ExecutorService o conceito é parecido, você usa um único ExecutorService para toda a aplicação. Com o ExecutorService você também não precisa criar threads explicitamente, você passa a lógica que deve ser executada encapsulada em Runnable’s ou Callable<V>'s. A diferença entre um e outro é que o segundo permite retornar um resultado. Uma vez que você submeteu a execução, o ExecutorService te retorna um objeto do tipo Future, que você pode usar para recuperar o resultado de maneira síncrona. O uso seria mais ou menos assim:

class ConsultaController{
  @Resource
  private ExecutorService executorService; //se você não usa DI, pode implementar uma factory ou algo do tipo

  public ResultadoAgregado consultar(){
    //dispara n consultas em paralelo 
    Future<Resultado> f1 = executorService.submit( () -> return service.get("http://host1/api") );
    Future<Resultado> f2 = executorService.submit( () -> return service.get("http://host2/api") );
    Future<Resultado> f3 = executorService.submit( () -> return service.get("http://host3/api") );

    //cada chamada a get() bloqueia a thread atual e
    //aguarda pelo resultado da chamada em paralelo
    Resultado r1 = f1.get();
    Resultado r2 = f2.get();
    Resultado r3 = f3.get();

    return new ResultadoAgregado(r1,r2,r3); 
  }
}
2 curtidas

Show de bola!!! E quanto à JSR-352 para processamento em lote, alguém já usou?

Se está trabalhando com esse tipo de sistema distribuído (acessando múltiplas APIs), recomendo dar uma lida no livro Release It, que ilustra vários problemas e soluçoes para esse tipo de sistema (um deles por exemplo, é sempre ter um timeout nessas chamadas).

Tem uma biblioteca do Netflix, chamada Hystrix que implementa algumas soluçoes propostas nesse livro, incluindo esse tipo de query de múltiplos serviços ao mesmo tempo, com a vantagem de trabalhar num nível um pouco mais alto do que a api padrao do java.

1 curtida

Valeu pela dica, vou dar uma olhada.

Se você usar mensagens, o processo todo passa a ser assíncrono. Neste caso não faz mais sentido definir um “tempo de processamento de resposta” ou tentar “montar um response único para o usuário” porque o usuário não vai receber a lista de quartos no response, e sim uma mensagem do tipo “estamos processando sua solicitação, em breve você terá o que pediu…” e você vai ter que arrumar alguma forma de disponibilizar essas informações depois quando elas estiverem disponíveis, seja via push, ou enviando uma email para o usuário com um link.

O timeout você configura na biblioteca HttpClient usada pra se conectar a API do hotel.

1 curtida

Saquei! Faz bastante sentido mesmo. Anteriormente eu estava associando o processo assíncrono somente com a independência de cada requisição e não havia levando em consideração essas variáveis que colocou. Olhando agora, creio que de fato o cenário orquestrado por um ExecutorService faz mais sentido.