Concorrência em arquivos

Pessoal,

Por uma série de restrições no ambiente que nossa aplicação está rodando vamos precisar persistir as mensagens (JMS) em disco.

A idéia é que o MDB vai ler as respostas da fila de retorno, gravá-las em arquivos texto (uma mensagem por arquivo??) e um WS vai ser chamado pelo cliente perguntando se existem respostas para serem enviadas.

A idéia é que eu teria um classe que verificaria todos os arquivos de mensagens, criaria uma array com as mensagens e mandaria para o “cara” que chamou o Web Service. Nesse processo eu teria que apagar as mensagens lidas para que outra instância da aplicação não acesse o mesmo arquivo e a resposta seja enviada duas (ou mais) vezes.(A aplicação roda em cluster)

Estou bem preocupado com todo esse problema de concorrência no arquivo texto!

Alguém teria uma sugestão para algo robusto, mesmo que não necessariamente super performático?

OBS: Não posso usar nenhum framework, tem que ser o velho e bom Java 1.4.

Obrigado

Use o nome do arquivo para distinguir se está pronto ou não.

Obrigado louds,

Mas apesar de estar funcionando estão havendo muitas colisões e algumas exceções.

Tentei locar o arquivo de algum forma, para diminuir isso, mas não deu certo porque preciso renomear o arquivo.

Você tem alguma sugestão?

Olhe o código de uma aplicação de teste que fiz (Só copiar na IDE e rodar…mas não repare no código porco, pois foi apenas um teste rápido):

[code]import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.OverlappingFileLockException;

/**
*

  • @author pen_fold
    */
    public class Main {

    private final int READER_DELAY = 8000;
    private final int WRITER_RELAY = 1000;

    private int _collisionsCounter = 0;

    /**

    • @param args the command line arguments
      */
      public static void main(String[] args) {
      // TODO code application logic here

      Main m = new Main();
      m.start();

    }

    private void start() {
    Thread creator = new FileCreator();
    creator.start();

     Thread reader0 = new FileReader("READER 0");
     Thread reader1 = new FileReader("READER 1");
     Thread reader2 = new FileReader("READER 2");
     Thread reader3 = new FileReader("READER 3");
     Thread reader4 = new FileReader("READER 4");
     Thread reader5 = new FileReader("READER 5");
     Thread reader6 = new FileReader("READER 6");
     Thread reader7 = new FileReader("READER 7");
     Thread reader8 = new FileReader("READER 8");
     Thread reader9 = new FileReader("READER 9");
     
     reader0.start();
     reader1.start();
     reader2.start();
     reader3.start();
     reader4.start();
     reader5.start();
     reader6.start();
     reader7.start();
     reader8.start();
     reader9.start();
    

    }

    private class FileReader extends Thread {

     private String _name = null;
     
     public FileReader(String str) {
         _name = str;
     }        
    
     public void run() {
         
         while(true) {
             
             File dir = new File(".");    
             String[] children = dir.list(getFilter());
    
             //FileLock lock = null; //channel.lock();     
             //FileChannel channel = null;
    
             if (children != null) {
                 for (int i=0; i<children.length; i++) {
                     try {                            
                         
                         File file = new File(children[i]);
                         //Gets a file channel for the file
                         //channel = new RandomAccessFile(file, "rw").getChannel();
    
                         /**                              
                          * Tries acquiring the lock without blocking. This method returns
                          * null or throws an exception if the file is already locked.
                          */ 
                         //if (channel != null) {
                         try {
                             
                             //lock = channel.tryLock();
    
                             // Rename file (or directory)
                             File newFile = new File("CUR_" + file.getName().substring(4) + "_" + _name);
    
                             boolean success = file.renameTo(newFile);
                             if (!success) {
                                 //System.out.println(_name + "=>" + "This file is already being used by another thread");
                                 _collisionsCounter++;
                                 continue;
                             }    
    
    
                             BufferedReader in = new BufferedReader(new java.io.FileReader(newFile.getName()));
                             String str;
                             while ((str = in.readLine()) != null) {
                                 System.out.println(_name + " => (" + str + ") Filename:" + file.getName() + " Collisions: " + _collisionsCounter);
                             }
    
                             in.close();                                    
    
                         } catch (OverlappingFileLockException ex) {                                    
                             System.out.println(_name + "=> Exception:" + ex.getMessage());
                             ex.printStackTrace();
                         }
                         //}
                         this.sleep(READER_DELAY);
                         
                     } catch (InterruptedException ex) {
                         System.out.println(_name + "=> Exception:" + ex.getMessage());
                         ex.printStackTrace();
                     } catch (FileNotFoundException ex) {
                         System.out.println(_name + "=> Exception:" + ex.getMessage());
                         ex.printStackTrace();                            
                     } catch (IOException ex) {
                         System.out.println(_name + "=> Exception:" + ex.getMessage());
                         ex.printStackTrace();  
                     } catch (Throwable ex) {
                         System.out.println(_name + "=> Throwable:" + ex.getMessage());
                         ex.printStackTrace();                              
                     } finally {
                         /*try {
                             // Remember to release the lock
                             lock.release();
                             channel.close();
                         } catch (IOException ex) {
                             System.out.println(_name + "=> Exception:" + ex.getMessage());
                             ex.printStackTrace();
                         }*/
                     }                       
                 }
             }
         }
     }
     
     private FilenameFilter getFilter() {
    
         //It's also possible to filter the list of returned files.
         //This example does not return any file that starts with `NEW_'.
         FilenameFilter filter = new FilenameFilter() {
             public boolean accept(File dir, String name) {
                 return name.startsWith("NEW_");
             }
         };
         
         return filter;           
     }
    

    }

    private class FileCreator extends Thread {

     public void run() {
         
         long counter = 0;
         while(true) {
             createFile("NEW_"+ (++counter) + "_" + System.nanoTime());
             try {
                 this.sleep(WRITER_RELAY);
             } catch (InterruptedException ex) {
                 ex.printStackTrace();
             }
         }
         
     }
     
     private void createFile(String strFileName) {
         
         // Stream to write file
         FileOutputStream out = null;		
    
         try
         {
             //Open an output stream
             out = new FileOutputStream (strFileName);
    
             // Print a line of text
             new PrintStream(out).println ("Message => " + System.nanoTime());
    
         }
         // Catches any error conditions
         catch (IOException e)
         {
                 System.err.println ("Unable to write to file");
                 System.exit(-1);
         }
         finally {
             try {
                 // Close our output stream
                 out.close();
             } catch (IOException ex) {
                 ex.printStackTrace();
             }
         }
     }        
    

    }
    }[/code]

Curiosidade, pq vc nao pode persistir em banco?

Coloque uma thread só procurando por arquivos e um pool consumindo eles.

código pseudo-java-de-quem-não-usa-isso-a-2-anos:

using java.util.concurrent;

ThreadPool pool = new ThreadPool ();
while(true){
  for(File f : new File(".").getFiles()) { 
      if (!f.getName().endsWith("__processing")) {
          f.renameTo (f.getName() + "__processing");
          pool.queueMoreStuff (new ProcessFile (f));
      }
   }
}


clas ProcessFile implements Runnable {
   File f;
   public ProcessFile (File f) { this.f = f;}


   public void run() { 
     MyFileUtilsLalala.DáTrêsPulosEUmaVoltinha (f);
   }
}

Se você está usando Java 5 as regras são as seguintes:

-Se está usando threads explicitamente, você deve estar fazendo errado;
-Se está usando locks, você deve estar fazendo errado;
-Se está controlando concorrencia com nome de arquivos ou variaveis, você deve estar fazendo muito errado;
-Pensou denovo e ainda está usando thread, deve estar fazendo besteira mesmo;
-Sério, new Thread() com Java5 deveria lançar um YouAreNotDougLeaException;
-Assim como synchronized/wait deveria lançar um DougLeaNotInTheRoomException

Falo sério, se você não come Java Concurrency in Practice com sucrilhos todo dia no café da manha, não
brinque com isso no trabalho porque não tem graça.

Use o suporte a thread pools no java, seja mais feliz e saia antes das 18hs. http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/package-summary.html#package_description .

Use algo como Executors.newFixedThreadPool(System.getAvailableProcessors() * COEFICIENTE_DE_ESPERA_IO_E_USO_DE_CPU) e seja feliz.
COEFICIENTE_DE_ESPERA_IO_E_USO_DE_CPU pode ser um número entre 2 e ** dependendo da característica da sua carga. Esse vou deixar como
dever de casa para tí.

Happy hacking.