Multithreaded Server

9 respostas
_fs

Oi, boa tarde.

Situação: há um server, que controla todas as conexões dos clients, cada client reside em um thread do server.

Até aqui ótimo.

Mas a dúvida é: Ao receber uma mensagem de um client (que reside num thread) como fazer para o server enviar esta mensagem para os outros client (que estão em outro threads)?

Agradetados adiancimentos. :mrgreen:

9 Respostas

kuchma

“LIPE”:
Situação: há um server, que controla todas as conexões dos clients, cada client reside em um thread do server.

Até aqui ótimo.

Mas a dúvida é: Ao receber uma mensagem de um client (que reside num thread) como fazer para o server enviar esta mensagem para os outros client (que estão em outro threads)?

Uma ideia:

Quando o cara se conectar voce joga a thread dele e um identificador (pode ser o IP p.ex.) numa colecao. Em cada mensagem voce envia o identificador do destinatario. Quando o servidor receber uma mensagem voce pega o identificador, resgata a thread do destinatario e dispara a mensagem para ele. :smiley:

Estou imaginando que essa “thread” que voce mencionou seja um objeto com o qual voce consiga se comunicar com o cliente, certo? :wink:

Depois diz pra gente se esse barato funcionou… Voce ta fazendo um chat? Tenho interesse nesse assunto. :smiley:

Marcio Kuchma

_fs

Então cara, o problema é anterior a esse que você já ajudou a solucionar hehe ainda não preciso selecionar qual client receber qual mensagem. Apenas mandar a mensagem para todo mundo hehe

Aqui está o código do server:

package chat;

import javax.swing.*;
import java.awt.*;
import java.awt.event.*;

import java.io.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class Server extends JFrame
{	
	protected JTextArea display;
	
	private int Conns = 50;
	private Vip vip[];
	
	private ServerSocket server;
	
	protected BufferedReader input;
	protected PrintWriter output;
	
	int inumClient;
	
	public Server()
	{
		super( "Server" );
		
		vip = new Vip[ Conns ];
		
		try
		{
			server = new ServerSocket( 60000, Conns );
		}
		catch( IOException e )
		{
			e.printStackTrace();
			System.exit( 1 );
		}

		Container c = getContentPane();
		
		display = new JTextArea();
		c.add( new JScrollPane( display ), BorderLayout.CENTER );
		
		setSize( 300, 150 );
		show();
	}
	
	protected void finalize()
	{
//	Objects created in run method are finalized when 
//	program terminates and thread exits
	  try
	  {
		  server.close();
	  }
	  catch (IOException e)
	  {
		  System.out.println("Could not close socket");
		  System.exit(-1);
	  }
	}
	
	public void execute()
	{
		for( int i = 0; i <= Conns; i++ )
		{
			try
			{
				vip[ i ] = new Vip( server.accept(), this );
				vip[ i ].start();
				System.out.println( i );
				inumClient++;
			}
			catch( IOException e )
			{
				e.printStackTrace();
				System.exit( 1 );
			}
		}
	}	
	
	public static void main( String args[] ) throws IOException
	{
		Server chat = new Server();
		
		chat.addWindowListener( 
			new WindowAdapter()
			{
				public void windowClosing( WindowEvent e )
				{
					System.exit( 0 );
				}
			}
		);
		
		chat.execute();
	}
}
	
class Vip extends Thread
{
	private Socket skt;
	protected BufferedReader input;
	protected PrintWriter output;
	private Server s;
	
	public Vip( Socket sock, Server serv )
	{
		skt = sock;
		s = serv;
	}
	
	public void run()
	{
		String message = "";

		try
		{
			skt.setTcpNoDelay(true); // http://jguru.com/faq/view.jsp?EID=42242
			
			s.display.append( "
Esperando uma conexão
");
				
			output = new PrintWriter( skt.getOutputStream() );
			input = new BufferedReader( new InputStreamReader( skt.getInputStream() ) );
			
			s.display.append( "
Got I/O streams" );
			s.display.append( "
Conexão bem sucedida. De client nº "+ s.inumClient + "
" );
			
			while( !message.equals( "a" ) )
			{
				message = ( String ) input.readLine();
				
				if( !message.equals( "" ) )
				{
					s.display.append( "
" + message );
					s.display.setCaretPosition( s.display.getText().length() );
				}
				
				output.println( "SERVER:: " + message );
				output.flush();
			}
			skt.close();
			input.close();
 			output.close();
		}
		catch( IOException e )
		{
			e.printStackTrace();
		}

		s.display.append( "
O cliente terminou a conexão." );
	}
}

Entendeu o que quis dizer com Thread? Então, cada client que se conecta, o server cria um thread para lidar com cada um deles.

Isso faz com que a comunicação client x server seja perfeita, mas a comunicação server x clientS (plural) não existe.

A minha dúvida é como fazer o server mandar um broadcast para todos os clients.

kuchma

“LIPE”:
Isso faz com que a comunicação client x server seja perfeita, mas a comunicação server x clientS (plural) não existe.

A minha dúvida é como fazer o server mandar um broadcast para todos os clients.

Hmmm, saquei o problema. :smiley:

Bom, vou ser sincero - (ainda) nao li teu codigo inteiro. :wink:

Mas pela tua duvida (melhor coisa que tem eh quando a gente consegue definir nossa duvida em poucas e claras palavras :)) eu posso sugerir duas coisas:

  • Veja como funciona a classe DatagramSocket e como acopla-la num endereco de multicast (um endereco de multicast eh igual um endereco comum, mas ele fica numa faixa especial - de 244.0.0.0 ate 239.255.255.255 acho). Dessa maneira voce conseguiria disparar mensagens para um endereco de multicast e todos que estivessem “ouvindo” neste endereco receberiam as mensagens. Mas nao sei como isso se encaixa no seu projeto atual - sockets de datagramas sao diferentes de stream sockets.

  • Outra maneira mais “tosca” seria voce percorrer sua colecao que armazenam os sockets dos clientes e enviar a mensagem para cada um individualmente. Ta, eu sei que isso nao eh broadcast, mas pode ser uma opcao. :smiley:

Estou trabalhando num projeto semelhante ao teu… comente teus progressos em fazer o broadcast. :smiley:

Marcio Kuchma

louds

Crie 1 fila de mensagens por cliente conectado e coloque cada thread consumindo da sua fila e produzindo para as demais.

algo +/- assim:

class Queue {
LinkedList queue = new LinkedList();
public synchronized void enqueue(Object obj) {
  queue.add(obj);
}

public synchronized Object dequeue() {
  if(queue.isEmpty())
   return null;
  return queue.removeFirst();
}

}

Coisa simples ate aqui ne?
Ok, agora voce cria 1 class que faz a multiplexagem das mensagens:

class QueueMultiplexor {
Collection queues = new HashSet();

public synchronized void register(Queue q) {
  queues.add(q);
}

public syhchronized void unregister(Queue q) {
  queues.remove(q);
}

public syhchronized void multiplex(Object msg, Queue source) {
   for(Iterator i = queues.iterator(); i.hasNext(); ) {
     Queue q = (Queue)i.next();
     if(i != source) 
       q.enqueue(msg);
  }
}

}

Esse cara pega e distribui 1 objeto vindo 1 uma fila para todas as demais.
E como voce usa isso? No loop das tuas thread que ficam lendo a entrada faça assim:

Queue me = new Queue();
       multiplexor.register(me);
        while( !message.equals( "a" ) ) 
         { 
            message = ( String ) input.readLine(); 
             
            if( !message.equals( "" ) ) 
            { 
               s.display.append( "
" + message ); 
               s.display.setCaretPosition( s.display.getText().length() ); 
               multiplexor.multiplex(message, me);
            }

            Object msg = null;
            while((msg = me.dequeue()) != null) 
                 output.println(msg);
 
            output.println( "SERVER:: " + message ); 
            output.flush(); 
         } 

        multiplexor.deregister(me);
_fs

kuchma - valeu mesmo, e pelo que andei pesquisando datagramSocket é realmente mais utilizado para este tipo de coisa, mas trabalhar com UDP me dá nos nervos hehe
Pode deixar que meu progresso deixará uma trilha de posts aqui :slight_smile:

Louds - muitissimo obrigado por responder, vou estudar melhor o código que você postou, pois admito ainda não conhecer alguns conceitos que você usou. Mas o jeito que você fez realmente me parece oferecer mais controle sobre as mensagens. Valeu!

_fs

Louds … não sei como te dizer isso mas … não rolou hehe mas vou continuar nesse caminho.

ps.: sim, eu arrumei os errinhos de digitação.

louds

cara, eu não testei esse código e te digo que ele tem grande problema.
Sockets no java são blocantes, ou seja, eles ficam parados enquanto não aparecer dados para serem lidos.

E isso vai fazer meu exemplo não funcionar, já que as mensagens da fila serão somente transmitidas para o cliente quando ele mandar algo pro servidor.

Exsitem algumas formas de resolver isso:

-Use o método available() antes de read() para verificar se existem dados disponiveis para leitura no socket.
-Usar read() com timeout pequeno.
-Usar sockets não-blocantes e read() pooling
-Usar sockets não-blocantes e io multiplexing (via select)

Minha sugestão é você tentar com os 2 primeiros que são beem mais simples

_fs

Não expliquei direito quando disse que não rolou. O server manda a mensagem para o cliente, mas para 1 cliente apenas (aquele que mandou a mensagem).

Pelo fato do método dequeue ser synchonized, ele não garantiria que a ação seria realizada por todos os threads?

E, pelo que entendi, a mágica toda de mandar a mensagem para todos os clients acontece aqui, não é?

while( ( msg = me.dequeue() ) != null ) 
					output.println( "SERVER :: " + message );

Valeu pelas dicas cara :slight_smile: vou pesquisar.

F
bom krinha se vc ainda tiver duvida em relacao a isso eu achei um negocio que pdoe te esclarecer
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Enumeration;
import java.util.Vector;


public class ServidorDeEco extends Thread {
	public static void main (String arg[]){
		clientes = new Vector();
		try{
			//criando socket para escutar porta 40000
			ServerSocket s = new ServerSocket (40000);
			while (true){
				//aguarda alguem se conectar. A execucao do servidor
				Socket conexao = s.accept();
				//obtendo os objetos de controle de fluxo de comunicacao
				Thread t = new ServidorDeEco(conexao);
				t.start();
			}
		}catch(IOException e){
			System.out.println("IOException: "+e);
		}
	}
	private static Vector clientes;
	private Socket conexao;
	private String meunome;
	public ServidorDeEco(Socket s){	
		conexao = s;}
	public void run(){
	try{
		BufferedReader entrada = new BufferedReader(new InputStreamReader(conexao.getInputStream()));
		PrintStream saida = new PrintStream (conexao.getOutputStream());
		meunome = entrada.readLine();
		if (meunome == null ){return;}
		clientes.add(saida);
		String linha = entrada.readLine();
		while (linha != null && !(linha.trim().equals(""))){
			sendToAll(saida, " disse: ", linha);
			linha = entrada.readLine();
		}
		sendToAll(saida, " saiu: ", " do chat! ");
		clientes.remove(saida);
		conexao.close();
	}catch(IOException e){
		System.out.println("IOException: "+e);
	}
	}
  
	public void sendToAll(PrintStream saida, String acao, String linha) throws IOException{
		Enumeration e = clientes.elements();
		while(e.hasMoreElements()){
			PrintStream chat = (PrintStream) e.nextElement();
		 if(chat != saida){
			 chat.println(meunome+acao+linha);}
		}
	}
}
cliente:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;


public class ClientedeEco extends Thread{
	private static boolean done = false;
	public static void main(String args[]){
		try{
			Socket conexao = new Socket("localhost",40000);
		
			PrintStream saida = new PrintStream(conexao.getOutputStream());
			
			BufferedReader teclado = new BufferedReader(new InputStreamReader(System.in));
			
			System.out.println("Entre com seu nome: ");
			
			String meunome = teclado.readLine();
			
			saida.println(meunome);
			
			Thread t = new ClientedeEco(conexao);
			t.start();
			
			String linha;
			while (true){
				System.out.println(" ");
				linha = teclado.readLine();
				if(done){break;}
				saida.println(linha);
			}
			
		}catch(IOException e ){
			System.out.println("IOException: "+e);
		}
	}
	private Socket conexao;
	public ClientedeEco(Socket s){
		conexao = s;
	}
	public void run(){
		try{
			BufferedReader entrada = new BufferedReader(new InputStreamReader(conexao.getInputStream()));
			String linha;
			while(true){
				linha = entrada.readLine();
				if (linha == null){
					System.out.println("Conexao Encerrada !");
					break;
				}
				System.out.println();
				System.out.println(linha);
				System.out.println("...> ");
			}	
		}catch(IOException e){
			System.out.println("IOException: "+e);
		}
		done = true;
	}
}
Criado 24 de outubro de 2003
Ultima resposta 18 de jan. de 2011
Respostas 9
Participantes 4