Olá pessoal.
Estou implementando um sistema que recebe dados em tempo real de um serviço AMQP e preciso persistir esses dados em um banco de dados MySQL a medida que recebo as mensagens. Acontece que o volume de dados enviado e a frequência dos envio dos dados é muito grande. Por isso não estou conseguindo salvar no banco a medida que as mensagens chegam.
Gostaria de saber se existe alguma forma de implementar uma Queue de threads para a medida que fosse recebendo as mensagens do servidor AMQP, esta queue gerenciasse a entrada de Threads onde cada uma execute os batchs de sql (via Hibernate).
O que vocês recomendam?
1 curtida
Se você estiver utilizando Java EE, pode usar a api de messageria JMS.
Você pode enfileirar as requisições em uma fila de mensagens, e processa-las de forma assíncrona em consumidores dessa fila. Dessa forma, você pode estabelecer uma política de consumo como achar melhor.
Exemplo:
Digamos que o sistema recebeu 10.000 mensagens (e continua recebendo). Essa mensagens estão enfileiradas num MOM. O container de aplicação vai começar a instanciar os message-driven beans sob demanda para consumir mensagens dessa fila, cada um na sua thread. Dado que cada mensagem é um dado a ser persistido, você pode fazer algo com essa cara:
public void consumir(Fila fila) {
List<Mensagem> mensagens consumidas = fila.consumirOuTimeout(100, 10 segundos); // consome 100 mensagens ou dá timeout em 10 segundos e retorna todas as que puderam ser consumidas
// converte cada mensagem em uma entidade
repositorio.persistir(entidades);
}
1 curtida
No caso é um projeto Java puro, sem interface web, apenas para consumo destas mensagens. Eu já possuo um sdk que me repassa as mensagens, mas para implementar a fila de mensagens para persistir os dados no banco que é minha real dificuldade no momento.
Já li alguns posts sobre ThreadPool.
Então, onde é o gargalo? Cada vez que chega uma mensagem é criada uma nova Thread
? Ou é o fato de que vários comandos SQL pequenos estão sendo emitidos e você quer fazer em batch?
Considerando esta classe que acumula uma lista de clientes.
public class CustomersManager {
private Logger logger = LoggerFactory.getLogger("customer_manager");
private List<Customer> customers;
public CustomersManager(){
customers = new ArrayList<Customer>();
}
public void addCustomer(Customer customer) {
customers.add(customer);
}
public void perform(){
logger.info("Amount of new customers" + customers.size());
Session session = new GenericDAO().getSession();
session.beginTransaction();
int i = 0;
for (Customer customer: customers) {
logger.info("Saving customer: " + customer.getName());
session.save(customer);
if ( i % 20 == 0 ) { //20, same as the JDBC batch size
//flush a batch of inserts and release memory:
session.flush();
session.clear();
}
i++;
}
session.getTransaction().commit();
session.close();
}
}
Eu teria esta classe que simula a criação simultanea de 10 lotees de clientes e salvaria cada lote usando a classe anterior.
public class CustomThreadPoolExecutor {
private RejectedExecutionHandlerImpl rejectionHandler;
private ThreadPoolExecutor executorPool;
private Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class.getName());
public CustomThreadPoolExecutor(){
rejectionHandler = new RejectedExecutionHandlerImpl();
executorPool = new ThreadPoolExecutor(2, 4, 10,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), rejectionHandler);
}
public void perform(CustomersManager customersManager) throws InterruptedException {
executorPool.execute(new Runnable() {
@Override
public void run() {
logger.info("executando save para customer manager");
customersManager.perform();
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
e por fim a classe Main com a simução
public static void main(String[] args) throws InterruptedException {
CustomThreadPoolExecutor poolExecutor = new CustomThreadPoolExecutor();
for (int i = 0; i < 100; i++) {
CustomersManager cm = new CustomersManager();
for (int j = 0; j < 10000; j++) {
Customer customer = new Customer();
customer.setName("Customer #"+j);
cm.addCustomer(customer);
}
poolExecutor.perform(cm);
}
System.out.println("finisah all tasks");
}