Processamento Paralelo em Scala

Olá:

Estou experimentando usar Programação Paralela em Scala. Baseado neste exemplo do site StackOverflow, criei um programa baseado no Problema 1 do Project Euler. Experimentei três modos: O primeiro é uma execução simples, sem paralelismo. O segundo usando a API do java.util.concurrency, através de Executors e Callables. O Terceiro, baseado na sugestão da página do StackOverflow, usando scala.Futures. O objetivo é comparar os tempos de se percorrer o range paralelamente e executar a soma.
Segue-se o código:

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {
  
  def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)
  
  def runSingle(max: Int): Int = (1 until max).filter(eval(_)).foldLeft(0)(_ + _)
  
  def runPool(max: Int): Int = {
    
    def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean] { def call = eval(i) }
    
    val pool = Executors.newFixedThreadPool(5)
    val result = (1 until max).filter(i => pool.submit(getCallable(i)).get).foldLeft(0)(_ + _)
    pool.shutdown
    pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
    
    result
  }
  
  def runFutures(max: Int): Int = (1 until max).filter(i => Futures.future(eval(i)).apply).foldLeft(0)(_ + _)
  
  /**
   * f é a função a ser executada. O retorno é uma Tuple2 contendo a soma e o 
   * tempo de execução.
   */
  def test(max: Int, f: Int => Int): (Int, Long) = {
    val t0 = System.currentTimeMillis
    val result = f(max)
    val deltaT = System.currentTimeMillis - t0
    
    (result, deltaT)
  }
  
  
  def main(args : Array[String]) : Unit = {
    val max = 10000
    
    println("Single : " + test(max, runSingle))
    println("Pool   : " + test(max, runPool))
    println("Futures: " + test(max, runFutures))
  }
}

O resultados que obtive foram os seguntes:

  • max = 10:
  • max = 100:
  • max = 1000:
  • max = 10000:

Claramente usando as APis de concorrencia do Java e de Scala não está gerando os resultados esperados. Portanto eu pergunto: Onde estou errando? Qual seria a forma mais adequada de se utilizar a Concorrência? E quanto aos atores de Scala? Como eles poderiam ser utilizados?

Grato,

O resultado que vc esperava era qual? uma solução ser mais rápida que a outra?

Acho que vc esta confundindo programação concorrente (que é a execução por diferentes threads) com paralelismo (execução em paralelo para fins de performance).

Scala não é capaz de explorar paralelismo de threads automaticamente. Alias, não conheco nenhuma linguagem que faça isso. Geralmente essa é uma decisão que fica a critério do programador, e o papel da linguagem é fornecer uma API que seja capaz de tirar proveito desse recurso. Em clojure por exemplo, existe uma variante da função map que é capaz de explorar paralelismo, veja pmap/pcalls. Em Scala não sei como funciona, sugiro que de uma olhada na documentação da linguagem.

O motivo pela qual a solução que usa pool de threads é pouca coisa mais rapida que as demais é porque, apesar de existirem 5 threads, elas não ajudam em quase nada já que comparado aos resto da computação que é feita, executar a expressão (n % 3 == 0) || (n % 5 == 0) é praticamente irrelevante, como mostrado pelos resultados obtidos. Passe o filtro para ser executado pela thread usando pool.invokeAll ao inves de pool.submit, que provavelmente o desempenho ira melhorar.

Sobre Futures, isso não tem nada a ver com paralelismo, é apenas uma forma de programação assincrona.

Atores é um modelo para computação distrubuida, não vejo como poderia ser usada para paralelismo local.

Olá:

Desculpe a demora, mas aí vai a minha resposta. Em primeiro lugar nunca lidei de verdade com multiprocessamento (vide a confusão que você apontou), de forma que parece ainda terei que ralar mais. Além disso ainda não consegui entender o esquema dos Actors de Scala. De qualquer forma, experimentei utilizar ExecutorSevice/Callables e também Actors conforme me foi sugerido aqui.

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {
  
  def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)
  
  def closePool(pool: ExecutorService) {
    pool.shutdown
    pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
  
  def runSingle(max: Int): Long = (1 until max).filter(eval(_)).foldLeft(0L)(_ + _)
  
  def runPool(max: Int): Long = {
    
    def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean] { def call = eval(i) }
    
    val pool = Executors.newCachedThreadPool 
    val result = (1 until max).filter(i => pool.submit(getCallable(i)).get).foldLeft(0L)(_ + _)
    closePool(pool)
    
    result
  }
  
  def runPoolJava(max: Int): Long = {
    var sum = 0L
    
    class SumCallable(n: Int) extends Callable[Int] {
      def call = {
        if(eval(n)) sum += n
        n
      }
    }
    
    val callables: java.util.List[Callable[Int]] = new java.util.ArrayList[Callable[Int]](max + 1)
    (1 until max).foreach(i => callables.add(new SumCallable(i)))
    
    val pool = Executors.newCachedThreadPool 
    pool.invokeAll(callables)
    closePool(pool)
    
    sum
  }
  
  def runActors(max: Int): Long = {
    import scala.actors.Actor._
    
    case class Add(n: Int)
    
    case object Result
    
    val act = actor {
      var sum = 0L
      loop {
        react {
          case Add(n) => if (eval(n)) { sum += n }
          case Result => reply(sum); exit
        }
      }
    }
   
    for (i <- 1 until max) act ! Add(i)
   
    act !? Result match {
      case result => result.toString.toLong
    }
  }

  
  /**
   * f é a função a ser executada. O retorno é uma Tuple2 contendo a soma e o 
   * tempo de execução.
   */
  def test(max: Int, f: Int => Long): (Long, Long) = {
    val t0 = System.currentTimeMillis
    val result = f(max)
    val deltaT = System.currentTimeMillis - t0
    
    (result, deltaT)
  }
  
  def testMax(max: Int) {
    println("* Max = " + max)
    println("Single  : " + test(max, runSingle))
    println("Pool    : " + test(max, runPool))
    println("PoolJava: " + test(max, runPoolJava))
    println("Actors  : " + test(max, runActors))
    println
  }
  
  def main(args : Array[String]) : Unit = {
    Array(0, 1, 3, 10, 30, 100, 300, 1000, 3000, 10000, 30000, 100000).foreach(testMax(_))
  }
}

Os resultados, infelizmente não foram muito melhores. E Além disso, no caso de PoolJava (ExecutorSevice/Callables) o resultado tornou-se errado a partir para 10000 e 100000.

  • Max = 0
    Single : (0,14)
    Pool : (0,169)
    PoolJava: (0,2)
    Actors : (0,170)

  • Max = 1
    Single : (0,0)
    Pool : (0,0)
    PoolJava: (0,0)
    Actors : (0,15)

  • Max = 3
    Single : (0,1)
    Pool : (0,19)
    PoolJava: (0,4)
    Actors : (0,8)

  • Max = 10
    Single : (23,0)
    Pool : (23,6)
    PoolJava: (23,16)
    Actors : (23,19)

  • Max = 30
    Single : (195,2)
    Pool : (195,11)
    PoolJava: (195,10)
    Actors : (195,19)

  • Max = 100
    Single : (2318,1)
    Pool : (2318,3)
    PoolJava: (2318,8)
    Actors : (2318,8)

  • Max = 300
    Single : (20850,2)
    Pool : (20850,45)
    PoolJava: (20850,29)
    Actors : (20850,19)

  • Max = 1000
    Single : (233168,8)
    Pool : (233168,71)
    PoolJava: (233168,98)
    Actors : (233168,122)

  • Max = 3000
    Single : (2098500,33)
    Pool : (2098500,203)
    PoolJava: (2098500,147)
    Actors : (2098500,216)

  • Max = 10000
    Single : (23331668,48)
    Pool : (23331668,189)
    PoolJava: (23323633,355)
    Actors : (23331668,417)

  • Max = 30000
    Single : (209985000,62)
    Pool : (209985000,170)
    PoolJava: (209985000,425)
    Actors : (209985000,138)

  • Max = 100000
    Single : (2333316668,12)
    Pool : (2333316668,518)
    PoolJava: (2333253053,1103)
    Actors : (2333316668,315)

Pelo visto ainda há muito o que aprender nesta área.
Mas vou seguir sua sugestão e tentar com algo que exija mais mais processamento, como verificar quais números de 0 até N são primos por exemplo.

Grato,

Já tentou erlang? :wink:
Agora, se quiser ficar no mundo java, tem esse projeto aqui http://akkasource.org/
Ela está na minha study list faz um tempo já.

[quote=fabiofalci]Já tentou erlang? :wink:
Agora, se quiser ficar no mundo java, tem esse projeto aqui http://akkasource.org/
Ela está na minha study list faz um tempo já.[/quote]
Oi! Erlang talvez para mais tarde…
Quanto ao Akka já visto referências a ele em alguns feeds sobre Scala mas nunca prestei muita atenção. Agora vou ler o artigo sobre Actors na Wikipedia e também dois artigos do Martin Odersky (criador do Scala para quem não sabe) a respeito do assunto (aqui e aqui).

Grato,

[quote=Rafael Afonso]Olá:

Desculpe a demora, mas aí vai a minha resposta. Em primeiro lugar nunca lidei de verdade com multiprocessamento (vide a confusão que você apontou), de forma que parece ainda terei que ralar mais. Além disso ainda não consegui entender o esquema dos Actors de Scala. De qualquer forma, experimentei utilizar ExecutorSevice/Callables e também Actors conforme me foi sugerido aqui.

package sandbox

import java.util.concurrent._
import scala.actors._

object TestPool {
  
  def eval(n: Int): Boolean = (n % 3 == 0) || (n % 5 == 0)
  
  def closePool(pool: ExecutorService) {
    pool.shutdown
    pool.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
  
  def runSingle(max: Int): Long = (1 until max).filter(eval(_)).foldLeft(0L)(_ + _)
  
  def runPool(max: Int): Long = {
    
    def getCallable(i: Int): Callable[Boolean] = new Callable[Boolean] { def call = eval(i) }
    
    val pool = Executors.newCachedThreadPool 
    val result = (1 until max).filter(i => pool.submit(getCallable(i)).get).foldLeft(0L)(_ + _)
    closePool(pool)
    
    result
  }
  
  def runPoolJava(max: Int): Long = {
    var sum = 0L
    
    class SumCallable(n: Int) extends Callable[Int] {
      def call = {
        if(eval(n)) sum += n
        n
      }
    }
    
    val callables: java.util.List[Callable[Int]] = new java.util.ArrayList[Callable[Int]](max + 1)
    (1 until max).foreach(i => callables.add(new SumCallable(i)))
    
    val pool = Executors.newCachedThreadPool 
    pool.invokeAll(callables)
    closePool(pool)
    
    sum
  }
  
  def runActors(max: Int): Long = {
    import scala.actors.Actor._
    
    case class Add(n: Int)
    
    case object Result
    
    val act = actor {
      var sum = 0L
      loop {
        react {
          case Add(n) => if (eval(n)) { sum += n }
          case Result => reply(sum); exit
        }
      }
    }
   
    for (i <- 1 until max) act ! Add(i)
   
    act !? Result match {
      case result => result.toString.toLong
    }
  }

  
  /**
   * f é a função a ser executada. O retorno é uma Tuple2 contendo a soma e o 
   * tempo de execução.
   */
  def test(max: Int, f: Int => Long): (Long, Long) = {
    val t0 = System.currentTimeMillis
    val result = f(max)
    val deltaT = System.currentTimeMillis - t0
    
    (result, deltaT)
  }
  
  def testMax(max: Int) {
    println("* Max = " + max)
    println("Single  : " + test(max, runSingle))
    println("Pool    : " + test(max, runPool))
    println("PoolJava: " + test(max, runPoolJava))
    println("Actors  : " + test(max, runActors))
    println
  }
  
  def main(args : Array[String]) : Unit = {
    Array(0, 1, 3, 10, 30, 100, 300, 1000, 3000, 10000, 30000, 100000).foreach(testMax(_))
  }
}

Os resultados, infelizmente não foram muito melhores. E Além disso, no caso de PoolJava (ExecutorSevice/Callables) o resultado tornou-se errado a partir para 10000 e 100000.

  • Max = 0
    Single : (0,14)
    Pool : (0,169)
    PoolJava: (0,2)
    Actors : (0,170)

  • Max = 1
    Single : (0,0)
    Pool : (0,0)
    PoolJava: (0,0)
    Actors : (0,15)

  • Max = 3
    Single : (0,1)
    Pool : (0,19)
    PoolJava: (0,4)
    Actors : (0,8)

  • Max = 10
    Single : (23,0)
    Pool : (23,6)
    PoolJava: (23,16)
    Actors : (23,19)

  • Max = 30
    Single : (195,2)
    Pool : (195,11)
    PoolJava: (195,10)
    Actors : (195,19)

  • Max = 100
    Single : (2318,1)
    Pool : (2318,3)
    PoolJava: (2318,8)
    Actors : (2318,8)

  • Max = 300
    Single : (20850,2)
    Pool : (20850,45)
    PoolJava: (20850,29)
    Actors : (20850,19)

  • Max = 1000
    Single : (233168,8)
    Pool : (233168,71)
    PoolJava: (233168,98)
    Actors : (233168,122)

  • Max = 3000
    Single : (2098500,33)
    Pool : (2098500,203)
    PoolJava: (2098500,147)
    Actors : (2098500,216)

  • Max = 10000
    Single : (23331668,48)
    Pool : (23331668,189)
    PoolJava: (23323633,355)
    Actors : (23331668,417)

  • Max = 30000
    Single : (209985000,62)
    Pool : (209985000,170)
    PoolJava: (209985000,425)
    Actors : (209985000,138)

  • Max = 100000
    Single : (2333316668,12)
    Pool : (2333316668,518)
    PoolJava: (2333253053,1103)
    Actors : (2333316668,315)

Pelo visto ainda há muito o que aprender nesta área.
Mas vou seguir sua sugestão e tentar com algo que exija mais mais processamento, como verificar quais números de 0 até N são primos por exemplo.

Grato,

[/quote]

Voce já falou em multiprocessamento, paralelismo, pool de threads, processamento distribuído alá actors… e eu continuo sem entender que resultados espera conseguir.

Não tem nada de pouco processamento no seu exemplo, como pode ser visto pelos resultados exibidos, vc é que esta distribuindo a carga de maneira errada como falei no primeiro post.

Problema 1 do projeto euler em clojure:

(reduce + (filter (fn [n] (or (zero? (rem n 3)) (zero? (rem n 5)))) (range 1000)))

Em scala:

[quote=mochuara]
Voce já falou em multiprocessamento, paralelismo, pool de threads, processamento distribuído alá actors… e eu continuo sem entender que resultados espera conseguir.

Não tem nada de pouco processamento no seu exemplo, como pode ser visto pelos resultados exibidos, vc é que esta distribuindo a carga de maneira errada como falei no primeiro post.[/quote]
Me desculpe se não estou sendo claro, vou dar um exemplo do que eu pretendo:
Digamos que estou numa sequencia e no momento estou em 1013 (que é primo). Enquanto rodo a verificação de sua “primalidade”, poderia paralelamente verificar 1014 (divisivel por 2)*, 1015 (por 5), 1016 (por 2) e 1017 (por 3), cuja verificação é mais rápida, pois seus divisores são pequenos. Ou seja enquanto um thread faz uma verificação mais demorada, outra pode fazer uma verificação mais rápida e passar para o próximo número disponível. E quando a primeira thread terminar sua verificação tamém pegaria o próximo número da fila.

  • Claro que um algorítmo de verdade pularia os números pares, estou apenas dando um exemplo.

Grato,

Parece que esse tipo de problema não é um bom candidato para processamento paralelo:

http://groups.google.com/group/clojure/msg/47030e46692dcffd