Alguem consegue me explicar o codigo abaixo

olá,
estava pesquisando sobre programação multithread em java e lendo aqui e ali achei o seguitne codigo
alguem sabe o q o cara tentou fazer?

public class TaskThreader {
    class DoStuff implements Callable {
       Object in;
       public Object call(){
         in = doStep1(in);
         in = doStep2(in);
         in = doStep3(in); 
         return in;
       }
       public DoStuff(Object input){
          in = input;
       }
    }

    public abstract Object doStep1(Object input);    
    public abstract Object doStep2(Object input);    
    public abstract Object doStep3(Object input);    

    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Callable> tasks = new ArrayList<Callable>();
        for(Object input : inputs){
           tasks.add(new DoStuff(input));
        }
        List<Future> results = exec.invokeAll(tasks);
        exec.shutdown();
        for(Future f : results) {
           write(f.get());
        }
    }
}

Cara, é simples.
Esse é um exemplo de uso das interfaces Callable e Future.

Primeiro ele cria uma classe que chamou de TaskThreader que implementa Callable. Essa interface é semelhante a Runnable, mas a diferença é que ela possui retorno. Analogamente, a grosso modo, é como se você criasse uma thread com retorno.

Prosseguindo, o ExecutorService é um cara que administra a execução dessas threads com retorno. O argumento que é passado pra ele indica a quantidade de threads paralelas que ele pode executar por vez. No caso está indicando que ele pode executar uma thread por núcleo que a jvm tem acesso.
Por exemplo, se o cara tiver 4 núcleos e enviar 80 objetos tasks o ExecutorService vai executar em blocos de 4 taks até finalizar todos.

Prosseguindo, quando ele passa ao ExecutorService quais tasks ele tem que executar pra cada task ele retorna um Future (logo, se ele indicou uma lista de Callable ele terá uma lista de igual tamanho de Futures).
Ok, mas você me pergunta: como recuperar o retorno de cada task sendo que são executadas assincronamente e sabe-se la em qual ordem?
Ótima pergunta!

A interface Future fornece o método get() que te devolve esse retorno.
Beleza, mas você me pergunta de novo: ok, entendi, cada Callable terá um Future associado mas e se eu chamar o get() do Future relacionado a uma Callable que ainda não terminou de executar?
Ótima pergunta novamente, quando esse caso acontece a execução fica parada até que o retorno do get() esteja disponível, ou seja, até a Callable associada tenha sido inteiramente executada.

Ficou claro? Há outros método, mas basta verificar o javadoc pra esclarecer o que fazem, tendo essa explicação fica simples de entender.
Qualquer dúvida escreve ai.

Esse tipo de recurso é muito utilizado em processamento em batch, filas de mensagem, filas de execução.

Tópicos recomendados:
1- Estudar o funcionamento de BlockingQueue’s (leitura obrigatória!)
2- Já ouviu falar em Scala? Veja o recurso Actors Model! (opcional)

É isso ai!
Abraços!

[quote=Tchello] Cara, é simples.
Esse é um exemplo de uso das interfaces Callable e Future.

Primeiro ele cria uma classe que chamou de TaskThreader que implementa Callable. Essa interface é semelhante a Runnable, mas a diferença é que ela possui retorno. Analogamente, a grosso modo, é como se você criasse uma thread com retorno.

Prosseguindo, o ExecutorService é um cara que administra a execução dessas threads com retorno. O argumento que é passado pra ele indica a quantidade de threads paralelas que ele pode executar por vez. No caso está indicando que ele pode executar uma thread por núcleo que a jvm tem acesso.
Por exemplo, se o cara tiver 4 núcleos e enviar 80 objetos tasks o ExecutorService vai executar em blocos de 4 taks até finalizar todos.

Prosseguindo, quando ele passa ao ExecutorService quais tasks ele tem que executar pra cada task ele retorna um Future (logo, se ele indicou uma lista de Callable ele terá uma lista de igual tamanho de Futures).
Ok, mas você me pergunta: como recuperar o retorno de cada task sendo que são executadas assincronamente e sabe-se la em qual ordem?
Ótima pergunta!

A interface Future fornece o método get() que te devolve esse retorno.
Beleza, mas você me pergunta de novo: ok, entendi, cada Callable terá um Future associado mas e se eu chamar o get() do Future relacionado a uma Callable que ainda não terminou de executar?
Ótima pergunta novamente, quando esse caso acontece a execução fica parada até que o retorno do get() esteja disponível, ou seja, até a Callable associada tenha sido inteiramente executada.

Ficou claro? Há outros método, mas basta verificar o javadoc pra esclarecer o que fazem, tendo essa explicação fica simples de entender.
Qualquer dúvida escreve ai.

Esse tipo de recurso é muito utilizado em processamento em batch, filas de mensagem, filas de execução.

Tópicos recomendados:
1- Estudar o funcionamento de BlockingQueue’s (leitura obrigatória!)
2- Já ouviu falar em Scala? Veja o recurso Actors Model! (opcional)

É isso ai!
Abraços![/quote]
Ótima resposta. Bem clara e direta.

Olá,

Muito boa reposta, so estou com umk problema não consegui fazer o codigo rodar :frowning: , vc teria alguma aplicação ai de exemplo? isso de exeutar por nucleo é exatamento o eu preciso, so espero q consiga rodar isso com java 1.4 pois e a JVm q tenho no AIX…

[quote=JJjava]Olá,

Muito boa reposta, so estou com umk problema não consegui fazer o codigo rodar :frowning: , vc teria alguma aplicação ai de exemplo? isso de exeutar por nucleo é exatamento o eu preciso, so espero q consiga rodar isso com java 1.4 pois e a JVm q tenho no AIX…[/quote]

Infelizmente o pacote java.util.concurrent só está disponível a partir do Java 5.

No AIX, ou você chora, ou então usa um back-port que foi feito para o Java 1.4:

http://altair.cs.oswego.edu/pipermail/concurrency-interest/2004-September/001035.html

O código mesmo está no SourceForge:

http://backport-jsr166.sourceforge.net/

A propósito, o programa está chamando “shutdown” antes do adequado. O correto seria:

        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());  
        ArrayList<Callable> tasks = new ArrayList<Callable>();  
        for(Object input : inputs){  
           tasks.add(new DoStuff(input));  
        }  
        List<Future> results = exec.invokeAll(tasks);  
        for(Future f : results) {  
           write(f.get());  
        }  
        exec.shutdown();  

Só não houve grandes problemas, porque não foi usada a versão do shutdown que força as tarefas a terminarem abruptamente enviando uma InterruptedException para cada uma delas.

[quote=entanglement]A propósito, o programa está chamando “shutdown” antes do adequado. O correto seria:

        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());  
        ArrayList<Callable> tasks = new ArrayList<Callable>();  
        for(Object input : inputs){  
           tasks.add(new DoStuff(input));  
        }  
        List<Future> results = exec.invokeAll(tasks);  
        for(Future f : results) {  
           write(f.get());  
        }  
        exec.shutdown();  

Só não houve grandes problemas, porque não foi usada a versão do shutdown que força as tarefas a terminarem abruptamente enviando uma InterruptedException para cada uma delas.
[/quote]

estou tendo problema na linha List results = exec.invokeAll(tasks); diz q o parametro esta incorreto…

[quote=entanglement]A propósito, o programa está chamando “shutdown” antes do adequado. O correto seria:

        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());  
        ArrayList<Callable> tasks = new ArrayList<Callable>();  
        for(Object input : inputs){  
           tasks.add(new DoStuff(input));  
        }  
        List<Future> results = exec.invokeAll(tasks);  
        for(Future f : results) {  
           write(f.get());  
        }  
        exec.shutdown();  

Só não houve grandes problemas, porque não foi usada a versão do shutdown que força as tarefas a terminarem abruptamente enviando uma InterruptedException para cada uma delas.
[/quote]

Você tem razão, mas nesse caso a ordem do shutdown tanto faz (desde que seja após a inclusão de todas as tasks desejadas).
O que ele faz é aguardar as taks inseridas serem executadas e após isso finaliza o ExecutorService.

Alguem consegiu fazer esse exemplo rodar?

o netbeans pedi para tranformar a classe em classe abstrata, e depois fica dando erro na chamada do exec.invokeAll();

pelo q vejo o parametro esta errado ArrayList tasks = new ArrayList();
o metodo pedi (Collection<? extends Callable> tasks) eu nao estou entendo oq ele pedi, pelo q entendi uma collection (ArrayList) de objetos q extendao Callabe , mas tem Callable isso seria uma nova coleção?

[quote=JJjava]Alguem consegiu fazer esse exemplo rodar?

o netbeans pedi para tranformar a classe em classe abstrata, e depois fica dando erro na chamada do exec.invokeAll();

pelo q vejo o parametro esta errado ArrayList tasks = new ArrayList();
o metodo pedi (Collection<? extends Callable> tasks) eu nao estou entendo oq ele pedi, pelo q entendi uma collection (ArrayList) de objetos q extendao Callabe , mas tem Callable isso seria uma nova coleção?

[/quote]

Cara, uma coisa de cada vez.
Antes de construir uma casa é feita toda a fundação dela, concorda?

Você não sabe nem generics e quer executar esse exemplo.

Pronto, já sabe por onde começar: estude generics.

[quote=Tchello][quote=JJjava]Alguem consegiu fazer esse exemplo rodar?

o netbeans pedi para tranformar a classe em classe abstrata, e depois fica dando erro na chamada do exec.invokeAll();

pelo q vejo o parametro esta errado ArrayList tasks = new ArrayList();
o metodo pedi (Collection<? extends Callable> tasks) eu nao estou entendo oq ele pedi, pelo q entendi uma collection (ArrayList) de objetos q extendao Callabe , mas tem Callable isso seria uma nova coleção?

[/quote]

Cara, uma coisa de cada vez.
Antes de construir uma casa é feita toda a fundação dela, concorda?

Você não sabe nem generics e quer executar esse exemplo.

Pronto, já sabe por onde começar: estude generics.[/quote]
:frowning:

Você vai usar Java 1.4, não? Então não dá para usar o exemplo com casca e tudo. Vai ter de baixar o tal pacote de http://backport-jsr166.sourceforge.net/
O jeito de escrever os programas também é um pouco diferente, porque não existe generics em Java 1.4.

O tal exemplo que você passou também está um pouco incompleto, porque está envolvendo o uso de uma classe abstrata. É lógico que você tem de completar seu exemplo criando uma classe concreta, que estende a classe abstrata. Se for rodar esse exemplo no NetBeans, usando Java 6.0, acredito que o código seja um pouco diferente. Vou tentar completar o exemplo aí para você.

/**
 * 
 */
package guj;

import java.util.concurrent.Callable;

/**
 *
 */
public abstract class TaskThreader {
    class DoStuff implements Callable<Object> {
        Object in;

        public Object call() {
            in = doStep1(in);
            in = doStep2(in);
            in = doStep3(in);
            return in;
        }

        public DoStuff(Object input) {
            in = input;
        }
    }

    public abstract Object doStep1(Object input);

    public abstract Object doStep2(Object input);

    public abstract Object doStep3(Object input);

}
package guj;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ConcreteTaskThreader extends TaskThreader {

    /**
     * Este passo pega o objeto 'input' e retorna o objeto java.lang.Class
     * referente à classe desse objeto
     */
    @Override
    public Object doStep1(Object input) {
        return input.getClass();
    }

    /**
     * Este passo verifica se a entrada é um objeto java.lang.Class, e se
     * for, retorna o nome da classe. Se não for, retorna null
     */
    @Override
    public Object doStep2(Object input) {
        if (input instanceof java.lang.Class) {
            return ((Class) input).getName();
        }
        return null;
    }

    /** 
     * Este passo verifica se o objeto é uma String, e se for,
     * retorna o tamanho dessa string (em um java.lang.Integer).
     * Se não for, retorna null
     */
    @Override
    public Object doStep3(Object input) {
        if (input instanceof String)
            return Integer.valueOf(((String) input).length());
        return null;
    }

    /**
     * @param args
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
        List<Object> inputs = Arrays.asList(new Object[] {
            new Integer(1234), new String("BLAH"), new BigInteger("777777777")
        });
        ConcreteTaskThreader ctt = new ConcreteTaskThreader();
        for (Object input : inputs) {
            tasks.add(ctt.new DoStuff(input));
        }
        List<Future<Object>> results = exec.invokeAll(tasks);
        for (Future<Object> f : results) {
            System.out.println(f.get());
        }
        exec.shutdown();
    }

}

O código acima, após execução, pode mostrar algo como:

17
16
20

Tem como verificar em qual processador ta rodando?

[quote=JJjava]Tem como verificar em qual processador ta rodando?

[/quote]
ninguem ???

Não sei dizer. Se houver é algo específico do sistema operacional. No linux estude o funcionamento do comando “ps”.

Mas qual diferença faz saber em qual núcleo ta rodando?
Qual o propósito?

cliente q se acha intendido… pra mim e transparente…