[Resolvido] VRaptor: Concorrência - ExecutorService + @ApplicationScoped

Olá,

Estou trabalhando em um sistema que possui a necessidade de executar tarefas em background.
Eu poderia usar Quartz ou TaskScheduler, porém, para as necessidades da minha aplicação, um simples ExecutorService construído com o método http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool() é mais do que o suficiente.
A idéia é ter um desses em escopo de aplicação e usar o método submit com Callables e Runnables.
A lógica das Tasks sempre será independente de variáveis compartilhadas, logo, não terei grandes problemas de sincronização.
A dúvida é: Basta eu construir uma classe Wrapper para o ExecutorService anotá-la com @ApplicationScoped e @Component, sair injetando esse componente via construtor, e chamando o método submit? Tem algum problema com essa abordagem? (Estou recebendo uns NullPointersExceptions quando tento injetar Daos nas tarefas)
Ou uma abordagem com ComponentFactory é melhor (para eu poder incluir lógica para destruir o ExecutorService, etc…)?
Eu Preciso sincronizar a chamada para o método submit no Wrapper ou ExecutorService?

Abraços,

olá,

Teoricamente sim, é só criar uma classe app scoped.

as tarefas que vc vai executar no seu Wrapper são componentes também? onde estão acontecendo os null pointers?

aí depende se vc quer usar a api de concorrência diretamente ou se vc quer abstrair para algo mais próximo da sua aplicação.
Ambas as abordagens são válidas (embora o wrapper te dê mais controle)

depende. Se vc estiver delegando pro ExecutorService, ele já se encarrega da concorrência, vc não precisa se preocupar com isso.

Lucas,

Muito obrigado pela pronta resposta.

Sim, eu tenho uma série de Daos anotados com @Component. Consigo injetá-los nos Controllers sem problemas.
Já nas tarefas eu recebo NullPointerExceptions (eu não sei ainda se nos componentes em si ou em sessões do Hibernate, estou debuggando agora).

public CircuitoServiceImpl(AnaService anaService, CircuitoAnaService circuitoAnaService, TemplateService templateService, CircuitoDAO circuitoDAO, VpnDAO vpnDAO, LicensaDAO licensaDAO, Agendador agendador, PeDAO peDAO, HistoricoDAO historicoDAO, RdDAO rdDAO, VpnSpokeDAO vpnSpokeDAO) { this.circuitoDAO = circuitoDAO; this.vpnDAO = vpnDAO; this.licensaDAO = licensaDAO; this.historicoDAO = historicoDAO; this.anaService = anaService; this.agendador = agendador; this.peDAO = peDAO; this.rdDAO = rdDAO; this.vpnSpokeDAO = vpnSpokeDAO; this.circuitoAnaService = circuitoAnaService; this.templateService = templateService; }

Será que é por causa do @ApplicationScopped?
Dá para eu deixar o Wrapper @ApplicationScopped (para que não se crie mais de um ExecutorService), porém, as tarefas sem essa anotação?

a tarefa é sempre a mesma? ou vc precisa customizar a tarefa de algum jeito em cada caso?

se ela for sempre a mesma, pode deixá-la como @Component só (@RequestScoped). Pelo que eu entendi vc vai agendá-la várias vezes, certo?

se ela for diferente, vc não precisa que ela seja um @Component, ou vc pode criar um @Component que fabrica a tarefa de acordo com algum parâmetro.

Vão ser várias tarefas tipo ExecutarCircuito implements Runnable e ExecutarTemplate implements Runnable
Geralmente as tarefas recebem um Pojo ou Id qualquer. Para não misturar as coisas, atualmente estou passando via setters

  private Circuito circuito;
	/**
	 * @return the circuito
	 */
	public Circuito getCircuito() {
		return circuito;
	}

	/**
	 * @param circuito the circuito to set
	 */
	public void setCircuito(Circuito circuito) {
		this.circuito = circuito;
	}

Os pojos chegam certinhos, já as Daos injetadas por construtor estão disparando NullPointerExceptions.

Isso.

Deixa eu ver se entendi.
Você diz eu ter uma espécie de Factory de Tarefas, injetar as Daos nessa Factory e construir as tarefas com new mesmo?
Por exemplo:

@Component
public class TaskFactory {

        private ExecutorService executorService;
        // DAOS
        
        public TaskFactory (ExecutorService executorService, /* DAOS */) {
              this.executorService = executorService;
              // DAOS
        }

        public Future<?> disparaTaskCircuito(Circuito c) {
              ExecutarCircuito ex = new ExecutarCircuito(/* DAOS */);
              ex.setCircuito(c);
              return executorService.submit(ex);         
        }
         
        public Future<?> disparaTaskTemplate(Template t) {
              ExecutarTemplate ex = new ExecutarCircuito(/* DAOS */);
              ex.setTemplate(t);
              return executorService.submit(ex);         
        }

        // ETC

}

Ou eu entendi errado?

isso, entendeu certo, mas eu faria sem os setters:

@Component  
public class TaskFactory {  
  
        private ExecutorService executorService;  
        // DAOS  
          
        public TaskFactory (ExecutorService executorService, /* DAOS */) {  
              this.executorService = executorService;  
              // DAOS  
        }  
  
        public Future<?> disparaTaskCircuito(Circuito c) {  
              ExecutarCircuito ex = new ExecutarCircuito(c, /* DAOS */);  
              return executorService.submit(ex);           
        }  
           
        public Future<?> disparaTaskTemplate(Template t) {  
              ExecutarTemplate ex = new ExecutarTemplate(t, /* DAOS */);
              return executorService.submit(ex);           
        }
  
        // ETC  
  
}  

talvez renomear essa TaskFactory pra Agendador ou algo do tipo

Oi Lucas,

Perfeito. Acho que isso resolve.
Só me tira uma última dúvida relacionada.
Hoje as Daos recebem uma Session do Hibernate intejatada por construtor (usamos a documentação do VRaptor):

registry.register(SessionCreator.class, SessionCreator.class); //cria Session's
registry.register(SessionFactoryCreator.class, SessionFactoryCreator.class)
registry.register(HibernateTransactionInterceptor.class, HibernateTransactionInterceptor.class);

Se eu injetar o Agendador em um controller (indiretamente, através de um objeto na camada de negócios), a Session das Daos vai passar pelo filtro do Open Session in View né? Daí no caso, a Thread que está executando em background (e que, logo, vai durar mais do que o request) não corre o risco de receber aqueles exceptions tipo “Session Already Closed”?
Tem algum jeito de injetar uma Sessao não ligada ao Open Session In View e fazer o controle na mão? Ou, melhor ainda, Fazer com que essas classes passem por um interceptor diferente, que abra a sessão antes de executar a Thread, chame o get() do future e faça o Commit ou Rollback depois?

Abraços,

bom, se vc não vai esperar a task terminar na mesma request vc vai precisar de sessions controladas mesmo…

o que vc pode fazer, por exemplo:
-receber SessionFactory no Agendador e criar os daos passando uma session própria
-criar daos especiais que recebem SessionFactory e fazem todo o controle de abrir e fechar.

PS:
vc criou um CustomProvider, certo?
não precisa mais fazer isso… remova o seu customProvider, a configuração do provider, e ao invés coloque isso no seu web.xml:

<context-param>
        	<param-name>br.com.caelum.vraptor.packages</param-name>
	        <param-value>br.com.caelum.vraptor.util.hibernate</param-value>
    </context-param>

Belezinha Lucas,

Acho que isso resolve o problema.
Já troquei a configuração de XML. Funcionou direitinho.
Muito, muito obrigado mesmo por toda a sua ajuda.
A unica coisa triste é ter que reescrever o código das Daos (trabalho braçal, tem um monte de Daos). Aquela idéia de fazer um interceptor não dá certo pois os interceptors são só para requests né? Tem como eu fazer algum outro esquema qualquer para abrir uma transação antes da execução da Thread e fechar depois?
Tipo usar @PostConstruct e @PreDestroy por exemplo?

Eu cheguei até em pensar em algo assim:

Uma classe abstrata “Tarefa” que implementa Runnable e recebe uma SessionFactory. Ela declara os métodos:
init - abre uma session e a transação, deixa uma referência para a session em escopo protected
destroy - fecha a session no escopo protected
loadDaos - método abstrato
execute - método abstrato.
run - dentro de um try chama init, loadDaos, executa e commit. Faz rollback no catch e chama destroy no finally

Cada tarefa seria uma subclasse de Tarefa e repassaria a SessionFactory para a superclasse via construtor. As subclasses implementariam a lógica da tarefa em si no método execute e criariam os daos que precisa com new no método loadDaos (para tanto passariam a Session em escopo protected para os construtores das Daos).
Eu injetaria a SessionFactory via construtor no Agendador, e criaria as tarefas com new, repassando para elas a SessionFactory.

Essa SessionFactory injetada pelo VRaptor se comportaria bem nesse caso? O que eu quero dizer com isso é, as Sessions criadas através dessa SessionFactory e guardadas em escopo protected seriam independentes? Ou eu corro algum risco dessas Sessions serem também fechadas pelo filtro do Open Session In View, vazarem, sofrerem timeout, ou algo assim?

Abraços,

não precisa reescrever os Daos. É só dar new neles usando outra session, a que vc criou no Agendador

sim, interceptors são só pra request. O que vc pode fazer, se o ExecutorService garantir ordem de execução, é criar uma tarefa pra fechar a session
sempre que criar uma task:

public Future<...> disparaTaskBolinha(Bolinha bolinha) {
    Session session = sessionFactory.openSession();
    //abre transação
    TaskBolinha bolinha = new TaskBolinha(bolinha, new BolinhaDao(session));
    Future f = executorFactory.submit(bolinha);
    executorFactory.submit(new CloseSession(f, session));
    return f;
}

e a implementação do closeSession:

public class CloseSession implements Runnable {
       //construtor que recebe f e session
      public void run() {
            try {
               f.get(5, TimeUnit.MINUTES); //troque isso pra um valor que vc ache aceitável
            } finally {
               //fecha transação
               session.close();
            }
      } 
}

talvez vc queira extrair essa lógica pra algum lugar. (só muda o runnable que vc está usando)

[quote=zeigfried]
Essa SessionFactory injetada pelo VRaptor se comportaria bem nesse caso? O que eu quero dizer com isso é, as Sessions criadas através dessa SessionFactory e guardadas em escopo protected seriam independentes? Ou eu corro algum risco dessas Sessions serem também fechadas pelo filtro do Open Session In View, vazarem, sofrerem timeout, ou algo assim?

Abraços,[/quote]
se vc usar o sessionFactory.openSession(), ele cria uma nova sempre.

criei uma issue no VRaptor pra gente criar um jeito genérico de fazer isso que vc tá querendo:

[]'s

rapaz, eu fiz um que quebrou meu galho nesse aspecto.

fiz assim:

@Component
@ApplicationScoped
public class TaskSchedulerFactory implements ComponentFactory<TaskScheduler> {
	
	private ThreadPoolTaskScheduler scheduler;
	
	@PostConstruct
    public void create() {
        scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();
    }
	
	public TaskScheduler getInstance() {
        return scheduler;
    }
	
	@PreDestroy
    public void destroy() {
        scheduler.destroy();
    }

}

chamo a instância no construtor da classe que vou utilizar assim:

public abstract class AbstractController {
	
	protected TaskScheduler scheduler;
	protected String category = "";
	protected Result result;
	protected Validator validator;
	protected UsuarioHandler usuarioHandler;
	
	public AbstractController(Result result, Validator validator, UsuarioHandler usuarioHandler,
			TaskScheduler scheduler){
		this.result = result;
		this.validator = validator;
		this.usuarioHandler = usuarioHandler;
		this.scheduler = scheduler;
	}

e essa é a rotina que vai executar

scheduler.schedule(new Runnable() {
			
			public void run() {
				try {
					rotina...;
				} catch (MessagingException e) {
					e.printStackTrace();
				}
				
			}
		}, now.getTime());
		

Tá bem genérico, e o pacote que usei foi o TaskScheduler do spring.

Lucas,

Boa! Essa TaskScope seria massa.
Eu acabei fazendo uma coisa bem parecida, só que ao invés de disparar uma Task a parte para fechar a sessão, fiz aquele esquema de uma superclasse abstrata que já implementa o run, abre a session e a transaction, executa um método abstrato e fecha a session depois.
Daí cada Task apenas implementa o método abstrato, tipo um Template method.

public abstract class Task implements Runnable {
	
	@Override
	public void run() {
		try {
			DaoManagerUtils.begin();
			setup();
			executar();
			DaoManagerUtils.commit();	
		} finally {
			DaoManagerUtils.rollback();
		}
		
	}
	
	/**
	 * Deve configurar daos, services, etc
	 */
	public abstract void setup();
	
	/**
	 * Lógica a ser agendada
	 */
	public abstract void executar();
	
}

(Eu estou ligado que esse rollback no finally está feio, mas no código da DaoManagerUtils abaixo fica claro o motivo).
Esse método setup está aí pois se eu passar a sessão / daos prontas no construtor, corro o risco de timeout de sessão até a tarefa começar a executar.
Também estou aproveitando esse mecanismo que foi construído com ThreadLocal abaixo (cada Thread fica com sua própria sessão).

Doravan.

É, nós temos um código de agendamento bem parecido aqui (seguindo o cookbook do VRaptor).
Tivemos só dois problemas com essa técnica.
O primeiro foi controle de transações como discutido acima.
O pessoal que estava no projeto fez um mecanismo de controle de Sessão individual para tasks usando variáveis static e a classe ThreadLocal (http://download.oracle.com/javase/6/docs/api/java/lang/ThreadLocal.html). Funciona bem. O único problema é o risco de vazar sessão se alguém esquecer de dar close() (por isso o Template Method acima).

public class DaoManagerUtils {

	/** The Constant threadSession. */
	private static final ThreadLocal<Session> threadSession = new ThreadLocal<Session>();

	/**
	 * Obtem o(a) session.
	 * 
	 * @return o(a) session
	 */
	public static Session getSession() {
		SessionFactoryCreator factoryCreator = SharedContext.getContainer()
				.instanceFor(SessionFactoryCreator.class);
		try {
			if (threadSession.get() == null) {
				return factoryCreator.getInstance().getCurrentSession();
			}
		} catch (Exception e) {
			if (threadSession.get() == null) {
				threadSession.set(newSession());
			}
		}
		return threadSession.get();
	}

	/**
	 * New session.
	 *
	 * @return the session
	 */
	public static Session newSession() {
		SessionFactoryCreator factoryCreator = SharedContext.getContainer()
				.instanceFor(SessionFactoryCreator.class);
		return factoryCreator.getInstance().openSession();
	}

	/**
	 * Inicia a transação.
	 */
	public static void begin() {
		if (getTransaction() != null && !getTransaction().isActive()) {
			getSession().getTransaction().begin();
		}
	}

	/**
	 * Reverte as operações realizadas.
	 */
	public static void rollback() {
		if (getTransaction() != null && getTransaction().isActive()) {
			getSession().getTransaction().rollback();
			if (threadSession.get() != null) {
				getSession().clear();
			}
		}
		threadSession.set(null);
	}

	/**
	 * Finaliza a operação.
	 */
	public static void commit() {
		getSession().getTransaction().commit();
		if (threadSession.get() != null) {
			getSession().clear();
		}
		threadSession.set(null);
	}

	/**
	 * Obtem a transação.
	 * 
	 * @return Transaction
	 */
	public static Transaction getTransaction() {
		return getSession().getTransaction();
	}

}

A segunda coisa é para excluir uma Task agendada.
Hoje está sendo feita uma coisa assim:

ThreadPoolTaskScheduler tps = (ThreadPoolTaskScheduler) scheduler;
ScheduledThreadPoolExecutor stp = (ScheduledThreadPoolExecutor) tps.getScheduledExecutor();
stp.getQueue().clear();

Daí todas as tasks são lidas do banco novamente. (Está ficando um pouco complicado para dar manutenção nisso devido a quantidade de tipos de tarefas que está crescendo, cada uma precisando de parâmetros diferentes para rodar corretamente).

O método remove() não funciona pois não tenho como guardar a Runnable original. Da mesma forma, não tenho como quardar o objeto ScheduledFuture para chamar cancel(). Eu tenho que dar um jeito de percorrer os Runnables agendados e matar por ID, ou algo assim.
Se alguém tiver uma receita para cancelar um task que foi agendada via scheduler.schedule() sem ter o objeto ScheduledFuture seria ótimo.
Se não, caso alguém saiba fazer isso com Quartz por exemplo, seria ótimo também.

se vc ainda tem a classe Agendador, que recebe a SessionFactory no construtor, sugiro que vc implemente sua classe Task assim:

public abstract class Task implements Runnable {  
    protected Task(SessionFactory factory) {
         this.factory = factory;
    }
    @Override  
    public void run() {
        this.session = factory.openSession();
        Transaction tx = null;
        try {  
            tx = session.beginTransaction();
            setup();  
            executar();  
            tx.commit();     
        } finally {  
            if (tx != null && tx.isActive()) {
                 tx.rollback();
            }
            this.session.close();
            this.session = null;
        }  
          
    }  
      
    /** 
     * Deve configurar daos, services, etc 
     */  
    public abstract void setup();  
      
    /** 
     * Lógica a ser agendada 
     */  
    public abstract void executar();  
      
}

assim vc não precisa da DaoManagerUtils (que é feia) nem da SharedContext (que é muito feia).

Desse jeito, é só na hora de criar a task, passar a sessionFactory no construtor.

PS: Qual a vantagem de usar o taskScheduler ao invés do ExecutorService? o segundo parece bem melhor.

Lucas. Concordo. Pensamos bem parecido hehehe. Eu nem conhecia essa classe ThreadLocal. Até eu entender o que esse código fazia foi um parto. Fica um código estrutural meio mágico… Sei lá.

Novamente eu concordo.
Para mim o TaskScheduler é uma abstração simples do ScheduledThreadPoolExecutor (veja que no fim das contas, o Spring delega para o ScheduledThreadPoolExecutor, que é uma implementação de ExecutorService). A única vantagem que eu consigo pensar para o segundo é poder usar a classe Date() para fazer agendamentos (nada que um bom Wrapper sobre um ScheduledThreadPoolExecutor não resolva).

ThreadPoolTaskScheduler tps = (ThreadPoolTaskScheduler) scheduler; ScheduledThreadPoolExecutor stp = (ScheduledThreadPoolExecutor) tps.getScheduledExecutor();

Esse tópico todo começou pois estamos precisando implementar novas Tasks e o TaskScheduler está trazendo muitos problemas (daí veio a sugestão de trocar ou para um ExecutorService puro ou Quartz. Minha sugestão foi que tentassemos o mais simples primeiro, que seria o ExecutorService).

O contexto da coisa é assim. Esse projeto foi inicialmente desenvolvido por um pessoal fera em Java de outra unidade da empresa em que estou trabalhando. Inicialmente o pessoal seguiu o coobook de scheduling com VRaptor (http://vraptor.caelum.com.br/cookbook/job-scheduling-com-vraptor3-e-spring/) que usa o TaskScheduler. Daí começaram a esbarrar nesses problemas e acho que essa solução foi sendo formada para contornar essas limitações do mecanismo de scheduling e de escopo de transação. A uns três meses atrás o projeto veio para mim (que não tenho muita experiência com VRaptor).

Agora a tendência é que esse módulo de Agendamento cresça bastante com Tasks diferentes. Eu mesmo preciso implementar duas novas Tasks que são centrais para o sistema. A idéia é aproveitar para dar um tapa no módulo agora enquanto ainda dá para reescrever tudo (melhor dar um tapa em 3k linhas de código agora do que em 20k daqui a uns tempos, com o sistema em produção ainda por cima).
Acho que a peça do quebra-cabeça “transações” (que era o mais chato na minha opinião) já encaixou graças a ajuda inestimável do Lucas.
A segunda coisa que tenho que descobrir é como eu faço para cancelar uma Tarefa individual a partir do ID ou algo assim.
Hoje se exclui tudo, vamos no banco e agendamos tudo de novo (o que era fácil enquanto só existiam dois tipos de tasks, porém, vai ficando linearmente mais difícil e complexo conforme a quantidade de tasks cresce).

Abraços,

mas a linha executor.submit(task) retorna um Future, vc não pode usar ele pra cancelar?

no pior dos casos, vc pode guardar esse future numa List (ou num Map<id, future> ) e retornar o id do future. Assim vc consegue cancelar (via future.cancel())

Fala Lucas,

Então. Até pensei em fazer algo por aí, deixando um ConcurrentHashMap em escopo de aplicação junto ao ExecutorService, inserir uma entrada <ID, Future> no Agendador e fazer com que cada Task se exclua do Map ao fim da execução.
Em questão de performance, acho que um sistema projetado para ser usado simultaneamente por umas 60 pessoas aguenta bem né? (Provavelmente o pico seriam umas 400- 500 entradas no Map).

Abraços,

vc já tem uma referencia para a task no ThreadPoolExecutor, então não tem problema… vc poderia até buscar a task no método getQueue se vc quiser, nem precisa do mapa.

só cuidado ao criar o TPE, passando valores compatíveis com o número de requisições que vc vai receber

Fechou.

Vou sentar o dedo aqui.
Eu já tinha tentado percorrer a Queue com o TaskScheduler, mas as referências internas estavam estranhas (não conseguia converter para a minha Runnable, acho que o Spring encapsula o objeto de alguma forma). Vou tentar com um ExecutorService diferente, se não der, mando um mapa.
Ah, última dúvida boba. Para essa segunda abordagem de percorrer a Queue, eu vou ter que sincronizar acesso ao ExecutorService né? O que acontece, por exemplo, se enquanto eu estiver percorrendo a Queue outra Tarefa for agendada, ou apagada?

Abraços,

PS: Como eu faço para marcar um tópico como resolvido?

o getQueue() retorna uma BlockingQueue, não tem problema percorrer ela enquanto alguem remove coisas, fica tranquilo.

pra colocar o tópico como resolvido é só editar o título, adicionando [Resolvido]