Tentando ser objetivo, gastei 40 horas nos últimos 3 dias reformulando e refatorando todo o space4j para fazer uso das poderosíssimas classes ConcurrentHashMap (tiger) e ConcurentSkipListMap (mustang).
Essas classes caíram como uma luva para resolver um probrema clássico de todo sistema prevalente: “Como fazer consultas e itinerações ao mesmo tempo em que o map que contem os dados está em constante modificação pelo thread principal do sistema?”
O grande pulo do gato que acredito ter sido o Oracle o pioneiro é: “Writers only get blocked by writers. Readers never get blocked by anything!” Num sistema prevalente vc não quer ter que colocar um READ (equivalente a um SELECT) na mesma fila dos WRITE (comandos que vão efetivamente alterar os dados).
Com essas novas classes concorrentes e super-complexas feito por Doug Lea isso agora se tornou possível para o Java, ou seja, vc pode itinerar por um mapa ou sup-mapas desse map (fazendo consultas complexas) enquanto o mesmo está sofrendo dezenas de alterações sem ganhar uma ConcurrentModficactionException.
Outra área que foi melhorada e simplificada ao extremo é a questão de índices. [color=“red”]Alguém consegue imaginar um banco de dados sem indices, fazendo full table scan para achar usuários entre 20 e 30 anos numa tabela de 100 mil usuários?[/color] O Space4J resolve esses problemas tranquilamente oferecendo 4 tipos de indices: RegularMap (ConcurrentHashMap), SortedMap (ConcurrentSkipListMap), MultiMap (ConcurrentHashMap que possui outros ConcurrentHashMaps) e MultiSortedMap (ConcurrentSkipListMap que possui ConcurentHashMaps). Também suporta qualquer tipo de chave composta (composta = 2 ou mais campos são indexados por um único indice), com comparators padrões para numeros, strings e datas. Vc pode fornecer seu próprio comparator caso queira indexar objetos diferentes de string, númbero e data (raridade). Tudo isso da maneira mais simples possível, abstraindo a complexidade do usuário, ou seja, vc apenas cria o índice e todo o resto (indexacao e re-indexação) acontece por mágica por trás dos panos.
A questão de cluster, fundamental para a questão do snapshot sem colocar o sistema em read-only, também está funcionando sem problemas. Nessa área ainda cabem várias melhorias como utilizar JGroups ou NIO. Entretanto a implementaçõa atual com plain sockets está atendendo as necessidades iniciais, tendo inclusive passado por um stress test que descrevo abaixo.
No stress test, totalmente documentado e acessível através da classe PhoneBookClusterStress, foram colocados dois nodes em cluster, cada um com 5 threads para insert, 5 para delete, 5 para select (simple scan) e 5 para search via indice. Esses dois nodes foram colocados em cluster, com uma tabela (map) inicial com 100 mil registros e milhares de alterações/consultas foram feitas (em ambos os nodes ao mesmo tempo) mantendo a consistencia dos nados e a sincronia dos nodes.
Acredito que há espaço para o re-lançamento desse projeto open-source, talvez como um sub-projeto no site do Mentawai. [color=“blue”]Se alguém estiver interessado em participar, entrem em contato por email (sergio.oliveira.jr at gmail.com) dizendo o porque vc gostaria de participar, sua experiencia com prevalencia e/ou tópicos relacionados como networking, cluster, nio, jgroups, java collections, indexação, banco de dados, serialização, etc.[/color]
Abaixo listo o código final dos exemplos: regular, indexado, cluster e cluster stress.
PhoneBook regular (sem indices!)
package org.space4j.demo.phonebook;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.space4j.CommandException;
import org.space4j.LoggerException;
import org.space4j.Space;
import org.space4j.Space4J;
import org.space4j.command.CreateMapCmd;
import org.space4j.command.CreateSequenceCmd;
import org.space4j.command.IncrementSeqCmd;
import org.space4j.command.PutCmd;
import org.space4j.command.RemoveCmd;
import org.space4j.implementation.SimpleSpace4J;
/**
* This is a simple application demonstrating how Space4J can be easily used as a database.
*
* Is is as simple as it can be. To execute just do:
*
* java org.space4j.demo.phonebook.PhoneBook
*
* @author Sergio Oliveira Jr.
*/
public class PhoneBook {
protected static final String MAP_NAME = "phonenumbers";
protected static final String SEQ_NAME = "phone_seq";
protected Space4J space4j = null;
protected Space space = null;
protected Map<String, String> phones = null;
public PhoneBook(Space4J space4j) throws Exception {
this.space4j = space4j;
// Start the system...
space4j.start();
// Grab the space where all data resides...
space = space4j.getSpace();
// If this is the first time, create our main hashmap...
if (!space.check(MAP_NAME)) {
space4j.exec(new CreateMapCmd<Integer, User>(MAP_NAME));
}
// if this is the first time, create the sequence for the unique ids...
if (!space.check(SEQ_NAME)) {
space4j.exec(new CreateSequenceCmd(SEQ_NAME));
}
phones = (Map<String, String>) space.get(MAP_NAME);
}
public void executeSnapshot() throws Exception {
space4j.executeSnapshot();
}
protected int getNextId() throws Exception {
return space4j.exec(new IncrementSeqCmd(SEQ_NAME));
}
public int addNumber(String first, String last, String phone) throws Exception {
int id = getNextId();
User user = new User(id, first, last, phone);
space4j.exec(new PutCmd(MAP_NAME, id, user));
return id;
}
public Collection<User> findUsers(String last) {
Collection<User> results = new LinkedList<User>();
Iterator<User> iter = space.getSafeIterator(MAP_NAME);
while(iter.hasNext()) {
User u = iter.next();
if (u.last.indexOf(last) != -1) {
results.add(u);
}
}
return results;
}
public boolean delUser(int id) throws CommandException, LoggerException {
int x = space4j.exec(new RemoveCmd(MAP_NAME, id));
return x > 0;
}
public Iterator<User> getUsers() {
return space.getSafeIterator(MAP_NAME);
}
public static class User implements Serializable {
private static final long serialVersionUID = 1;
private int id;
private String first;
private String last;
private String phone;
public User(int id, String first, String last, String phone) {
this.id = id;
this.first = first;
this.last = last;
this.phone = phone;
}
public String toString() {
StringBuilder sb = new StringBuilder(64);
sb.append(id).append(": ").append(first).append(" ").append(last);
sb.append(" ").append(phone);
return sb.toString();
}
public int getId() {
return id;
}
public String getFirstName() {
return first;
}
public String getLastName() {
return last;
}
public String getPhoneNumber() {
return phone;
}
public int hashCode() {
return id;
}
public boolean equals(Object obj) {
if (obj instanceof User) {
User u = (User) obj;
if (u.id == this.id) return true;
}
return false;
}
}
/**
* Just some simple logic for a minimal PhoneBook application.
* NOTE: This is java code, not Space4J code!
*/
public static void run(PhoneBook book) throws Exception {
// supressed for clarity...
}
public static void main(String [] args) throws Exception {
PhoneBook book = new PhoneBook(new SimpleSpace4J("PhoneBook"));
run(book);
}
}
PhoneBook indexado:
package org.space4j.demo.phonebook;
import java.util.Collection;
import java.util.Iterator;
import org.space4j.Space4J;
import org.space4j.implementation.SimpleSpace4J;
import org.space4j.indexing.Index;
import org.space4j.indexing.IndexManager;
import org.space4j.indexing.MultiMap;
/**
* Let's do some indexing !!!
*
* java org.space4j.demo.phonebook.PhoneBookIndexing
*
* @author Sergio Oliveira Jr.
*/
public class PhoneBookIndexing extends PhoneBook {
private static final String INDX_LASTNAME = "indx_lastname";
private final Index indexByLastName;
public PhoneBookIndexing(Space4J space4j) throws Exception {
super(space4j);
// let's create our index in with the last name field...
IndexManager im = space4j.getIndexManager();
// If index does not exist, let's create it...
if (!im.checkIndex(INDX_LASTNAME)) {
Index indx = new Index(INDX_LASTNAME, MAP_NAME, Index.TYPE.MULTI, User.class, "lastName");
boolean ok = im.createIndex(indx, space4j);
if (ok) {
System.out.println(indx + " created!");
} else {
System.out.println(indx + " could not be created!");
}
}
indexByLastName = im.getIndex(INDX_LASTNAME);
}
/*
* Using index to find users with last name...
*/
@Override
public Collection<User> findUsers(String last) {
MultiMap map = (MultiMap) indexByLastName.getMap();
Collection coll = map.get(last);
return coll;
}
public static void main(String [] args) throws Exception {
PhoneBookIndexing book = new PhoneBookIndexing(new SimpleSpace4J("PhoneBook"));
run(book);
}
}
PhoneBook em cluster:
package org.space4j.demo.phonebook;
import org.space4j.Space4J;
import org.space4j.implementation.MasterSpace4J;
import org.space4j.implementation.SlaveSpace4J;
/**
* This is to show you how simple it is to make a Space4J cluster!
* To make a cluster, open up two shells (or DOS) and execute in the first:<br>
* java org.space4j.demos.phonebook.PhoneBookCluster master<br><br>
* and in the second:<br>
* java org.space4j.demos.PhoneBookCluster slave 127.0.0.1<br><br>
* You can also use two different machines. Just pass the master IP address to the slave instead of 127.0.0.1.<br>
* IMPORTANT: You must have the main Space4J dir (space4j_db) mounted, so both machines can have access to it.
*/
public class PhoneBookCluster extends PhoneBookIndexing {
public PhoneBookCluster(Space4J space4j) throws Exception {
super(space4j);
}
static Space4J createSpace4J(String master, String master_ip) throws Exception {
Space4J space4j = null;
if (master.equals("-master")) {
space4j = new MasterSpace4J("PhoneBook");
} else if (master.equals("-slave")) {
if (master_ip != null) {
space4j = new SlaveSpace4J("PhoneBook", master_ip);
} else {
space4j = new SlaveSpace4J("PhoneBook", "127.0.0.1");
}
}
return space4j;
}
public static void main(String [] args) throws Exception {
if (args.length < 1 || args.length > 2) {
System.out.println("format: java PhoneBookCluster [-master|-slave] <IP def: 127.0.0.1>");
return;
}
Space4J space4j = createSpace4J(args[0], args.length > 1 ? args[1] : null);
PhoneBookCluster book = new PhoneBookCluster(space4j);
run(book);
}
}
PhoneBook em cluster com stress test:
package org.space4j.demo.phonebook;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import org.space4j.Space4J;
public class PhoneBookClusterStress extends PhoneBookCluster {
private final Thread[] insertThread;
private final Thread[] deleteThread;
private final Thread[] selectThread;
private final Thread[] searchThread;
private final Thread snapshotThread;
private Random rand1 = new Random();
private Random rand2 = new Random();
private Random rand3 = new Random();
private StringBuilder sb1 = new StringBuilder(32);
private StringBuilder sb2 = new StringBuilder(32);
public PhoneBookClusterStress(Space4J space4j, final int nInsert, final int nDelete, final int nSelect, final int nSearch,
final int sleepTime, final int tableSize, final int snapshotTime) throws Exception {
super(space4j);
// populate the table if needed...
int size = phones.size();
if (size < tableSize) {
int diff = tableSize - size;
for(int i=0;i<diff;i++) {
insertRandom();
System.out.print("\rInserting " + i + "... ");
}
}
System.out.println();
insertThread = new Thread[nInsert];
deleteThread = new Thread[nDelete];
selectThread = new Thread[nSelect];
searchThread = new Thread[nSearch];
for(int i=0;i<nInsert;i++) insertThread[i] = new Thread(new Runnable() {
public void run() {
while(true) {
try {
Thread.sleep(sleepTime);
handleInsert();
} catch(Exception e) {
return;
}
}
}
});
for(int i=0;i<nDelete;i++) deleteThread[i] = new Thread(new Runnable() {
public void run() {
while(true) {
try {
Thread.sleep(sleepTime);
handleDelete();
} catch(Exception e) {
return;
}
}
}
});
for(int i=0;i<nSelect;i++) selectThread[i] = new Thread(new Runnable() {
public void run() {
while(true) {
try {
Thread.sleep(sleepTime);
handleSelect();
} catch(Exception e) {
return;
}
}
}
});
for(int i=0;i<nSearch;i++) searchThread[i] = new Thread(new Runnable() {
public void run() {
while(true) {
try {
Thread.sleep(sleepTime);
handleSearch();
} catch(Exception e) {
return;
}
}
}
});
snapshotThread = new Thread(new Runnable() {
public void run() {
while(true) {
try {
Thread.sleep(snapshotTime);
System.out.println("Executing snapstho...");
PhoneBookClusterStress.this.executeSnapshot();
System.out.println("=============================> Snapshot Complete!");
} catch(Exception e) {
return;
}
}
}
});
}
public void start() {
for(int i=0;i<insertThread.length;i++) insertThread[i].start();
for(int i=0;i<deleteThread.length;i++) deleteThread[i].start();
for(int i=0;i<selectThread.length;i++) selectThread[i].start();
for(int i=0;i<searchThread.length;i++) searchThread[i].start();
snapshotThread.start();
}
protected int insertRandom() throws Exception {
sb1.setLength(0);
sb1.append("Oliveira").append(rand1.nextInt(100) + 1);
return this.addNumber("Sergio", sb1.toString(), "123-4321");
}
protected int deleteRandom() throws Exception {
Iterator iter = phones.keySet().iterator();
int count = rand2.nextInt(phones.size() / 10) + 1;
int index = 0;
int id = -1;
while(iter.hasNext() && index++ ><= count) {
id = (Integer) iter.next();
}
if (id > 0) {
this.delUser(id);
}
return id;
}
private void handleInsert() throws Exception {
int id = insertRandom();
System.out.println("Inserted record number " + id);
}
private void handleDelete() throws Exception {
int id = deleteRandom();
if (id == -1) {
System.out.println("Could not delete anything!");
} else {
System.out.println("Deleted record number " + id);
}
}
private void handleSelect() {
Iterator iter = phones.values().iterator();
int count = 0;
while(iter.hasNext() && count++ <= 300) {
iter.next();
}
System.out.println("Selected " + count + " records!");
}
private void handleSearch() {
sb2.setLength(0);
sb2.append("Oliveira").append(rand3.nextInt(100) + 1);
Collection coll = this.findUsers(sb2.toString());
int count = 0;
Iterator iter = coll.iterator();
while(iter.hasNext()) {
count++;
iter.next();
}
System.out.println("Found " + count + " recods!");
}
public static void main(String[] args) throws Exception {
if (args.length >= 3) {
if (args[2].equals("-nostress")) {
Space4J space4j = createSpace4J(args[0], args[1]);
PhoneBookCluster phone = new PhoneBookCluster(space4j);
run(phone);
return;
}
}
if (args.length != 9) {
System.out.println("format: java PhoneBookStress [-master|-slave] <ip> [-nostress] <number of insert threads> <number of delete threads> " +
"<number of select threads> <number of search threads> <thread sleep time> <table size> <snapshot time>");
return;
}
int nInsert, nDelete, nSelect, nSearch, sleepTime, tableSize, snapTime;
nInsert = Integer.parseInt(args[2]);
nDelete = Integer.parseInt(args[3]);
nSelect = Integer.parseInt(args[4]);
nSearch = Integer.parseInt(args[5]);
sleepTime = Integer.parseInt(args[6]);
tableSize = Integer.parseInt(args[7]);
snapTime = Integer.parseInt(args[8]);
Space4J space4j = createSpace4J(args[0], args[1]);
PhoneBookClusterStress stress = new PhoneBookClusterStress(space4j, nInsert, nDelete, nSelect, nSearch, sleepTime, tableSize, snapTime);
stress.start();
}
}