Concorrencia com Hibernate

Olá pessoal.

Estou implementando um sistema que recebe dados em tempo real de um serviço AMQP e preciso persistir esses dados em um banco de dados MySQL a medida que recebo as mensagens. Acontece que o volume de dados enviado e a frequência dos envio dos dados é muito grande. Por isso não estou conseguindo salvar no banco a medida que as mensagens chegam.

Gostaria de saber se existe alguma forma de implementar uma Queue de threads para a medida que fosse recebendo as mensagens do servidor AMQP, esta queue gerenciasse a entrada de Threads onde cada uma execute os batchs de sql (via Hibernate).

O que vocês recomendam?

1 curtida

Se você estiver utilizando Java EE, pode usar a api de messageria JMS.

Você pode enfileirar as requisições em uma fila de mensagens, e processa-las de forma assíncrona em consumidores dessa fila. Dessa forma, você pode estabelecer uma política de consumo como achar melhor.

Exemplo:

Digamos que o sistema recebeu 10.000 mensagens (e continua recebendo). Essa mensagens estão enfileiradas num MOM. O container de aplicação vai começar a instanciar os message-driven beans sob demanda para consumir mensagens dessa fila, cada um na sua thread. Dado que cada mensagem é um dado a ser persistido, você pode fazer algo com essa cara:

public void consumir(Fila fila) {
    List<Mensagem> mensagens consumidas = fila.consumirOuTimeout(100, 10 segundos); // consome 100 mensagens ou dá timeout em 10 segundos e retorna todas as que puderam ser consumidas
    // converte cada mensagem em uma entidade
    repositorio.persistir(entidades);
}
1 curtida

No caso é um projeto Java puro, sem interface web, apenas para consumo destas mensagens. Eu já possuo um sdk que me repassa as mensagens, mas para implementar a fila de mensagens para persistir os dados no banco que é minha real dificuldade no momento.

Já li alguns posts sobre ThreadPool.

Então, onde é o gargalo? Cada vez que chega uma mensagem é criada uma nova Thread? Ou é o fato de que vários comandos SQL pequenos estão sendo emitidos e você quer fazer em batch?

Considerando esta classe que acumula uma lista de clientes.

public class CustomersManager {
    private Logger logger = LoggerFactory.getLogger("customer_manager");
    private List<Customer> customers;

    public CustomersManager(){
        customers = new ArrayList<Customer>();
    }
    public void addCustomer(Customer customer) {
        customers.add(customer);
    }

    public void perform(){
        logger.info("Amount of new customers" + customers.size());
        Session session = new GenericDAO().getSession();
        session.beginTransaction();
        int i = 0;
        for (Customer customer: customers) {
            logger.info("Saving customer: " + customer.getName());
            session.save(customer);
            if ( i % 20 == 0 ) { //20, same as the JDBC batch size
                //flush a batch of inserts and release memory:
                session.flush();
                session.clear();
            }
            i++;
        }
        session.getTransaction().commit();
        session.close();
    }
}

Eu teria esta classe que simula a criação simultanea de 10 lotees de clientes e salvaria cada lote usando a classe anterior.

public class CustomThreadPoolExecutor {
    private RejectedExecutionHandlerImpl rejectionHandler;
    private ThreadPoolExecutor executorPool;
    private Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class.getName());

    public CustomThreadPoolExecutor(){
        rejectionHandler = new RejectedExecutionHandlerImpl();
        executorPool = new ThreadPoolExecutor(2, 4, 10,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), rejectionHandler);
    }

    public void perform(CustomersManager customersManager) throws InterruptedException  {
        executorPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("executando save para customer manager");
                customersManager.perform();

                try {
                    Thread.sleep(30000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

e por fim a classe Main com a simução

public static void main(String[] args) throws InterruptedException {
    CustomThreadPoolExecutor poolExecutor = new CustomThreadPoolExecutor();

    for (int i = 0; i < 100; i++) {

        CustomersManager cm = new CustomersManager();
        for (int j = 0; j < 10000; j++) {
            Customer customer = new Customer();
            customer.setName("Customer #"+j);
            cm.addCustomer(customer);
        }

        poolExecutor.perform(cm);
    }
    System.out.println("finisah all tasks");
}