Ajuda em MQ - filas

2 respostas
donatinho

Galera, outra duvida

Tenho esse consumidor implementado no qual fiz o consumidor recuperar registros de uma fila na unha.

Queria saber se existe algo em MQ que, caso desse algum erro na aplicacao, o registro voltasse pra fiila.

Vejam o codigo:

public class ConsumidorMQRecepcaoNFe_10threads_5procs implements Runnable {
	private Session session = null;
	static final Logger log = Logger.getLogger(ConsumidorMQRecepcaoNFe_10threads_5procs.class);
	
	public void run() {
		//System.out.println(new Date());
		onMessage();
	}
	
	public void onMessage() {
		log.info(" ################# RECEBEU A MENSAGEM ");
		Connection connection = null;
		MessageConsumer receiver = null;

		try {
			MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

			connectionFactory.setHostName(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.mq.host.name"));
			connectionFactory.setPort(Integer.parseInt(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.mq.porta")));
			connectionFactory.setChannel(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.mq.channel"));
			connectionFactory.setQueueManager("");
			connectionFactory.setTransportType(Integer.parseInt(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.mq.transport.type")));
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			final MQQueue destination = new MQQueue(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.queue.nfe"));
			receiver = session.createConsumer(destination);
			
			connection.start();
			
			int qtdMaxProc = 5;
			
			// tamanho maximo do lote - 500kb
			int tamanhoLote = Integer.parseInt(LerArquivoPropertie.getPropriedade(ParametroDFe.DFE_CONFIGURACOES_PROPERTIES.getValue(), "nfe.dfe.tamanho.arquivo.lote"));
			
			double temposFila[] = new double[50];
			double temposTratarXml[] = new double[50];
			// double temposSession[] = new double[50];
			double temposText[] = new double[50]; 
			
			ManipulaXml map = new ManipulaXml();
			
			String numProtLog = null;
			
			for (int i = 0; ((i < qtdMaxProc) && conjuntoAN.length() < tamanhoLote); i++) {
				final TextMessage textMessage = (TextMessage)receiver.receiveNoWait();
				if(textMessage != null) {
					if(i == 0)
						numProtLog = textMessage.getJMSCorrelationID();
						
					String msg = textMessage.getText();
					map.limparXml();
					map.setXmlDFe(msg);
					tratarXml(msg, Long.parseLong(textMessage.getJMSCorrelationID()), map);
						
					} else {
						log.info(" ################# Atencao!  Não existe mensagem na fila para consumo neste momento!");
				
				}
			} // fim for
			// log.info(" ################# Log para protocolo inicial lote " + numProtLog +  " : Tempo for : " + calcularTempo(inicioFor, System.currentTimeMillis()));
			
			// calculo tempo fila
			double somaTotalFila = 0;
			for (int i = 0; i < temposFila.length; i++) {
				somaTotalFila += temposFila[i];  
			}
			
			// calculo tempo banco
			double somaTotalTratarXml = 0;
			for (int i = 0; i < temposTratarXml.length; i++) {
				somaTotalTratarXml += temposTratarXml[i];
			}
			
			// calculo tempo text
			double somaTotalText = 0;
			for (int i = 0; i < temposText.length; i++) {
				somaTotalText += temposText[i];
			}
			log.info(" ################# Log para protocolo inicial lote " + numProtLog +  " : Tempo Consumo fila " + somaTotalFila + " s. Tempo tratar xml " + somaTotalTratarXml  + " s. Tempo Text " + somaTotalText + "s. Tamanho da lista a ser enviada ao ambiente nacional " + listaControleDFeVO.size() + " e nota enviada é de: " + conjuntoAN.toString().length() + " em bytes");
			
		} catch (JMSException exception) {
			exception.printStackTrace();
			throw new RuntimeException();
		
		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException();
			
		} finally {
			// fechar(consumer);
			fechar(receiver);
			fechar(session);
			fechar(connection);
		}
	}

2 Respostas

drsmachado

Default, não.
Você precisa reenviar e, para isso, precisará lembrar que uma fila trabalha com o conceito FIFO (First In First Out - Primeiro a entrar é o primeiro a sair).
Ou seja, não tem como guardar lugar na fila, saiu dela, já era. Se reenviar, terá que aguardar até que esta mensagem esteja na vez de ser lida.

donatinho

Opa drsmachado , tudo bom?

Obrigado pela ajuda!

Entao, quando recupera a mensagem pelo container, gerandou uma excecao runtimeexcpetion, a mensagem retorna para a fila.

Como a aplicacao agora nao tem o container, caso eu gere uma session.rollback() sera que o registro nao retorna pra fila?

Criado 3 de julho de 2014
Ultima resposta 3 de jul. de 2014
Respostas 2
Participantes 2