MongoDB
Operaciones masivas
Buscar..
Observaciones
Construyendo una lista de operaciones de escritura para realizar en forma masiva para una sola colección.
Convertir un campo a otro tipo y actualizar toda la colección en forma masiva
Generalmente, el caso cuando uno quiere cambiar un tipo de campo a otro, por ejemplo, la colección original puede tener campos "numéricos" o "fecha" guardados como cadenas:
{
"name": "Alice",
"salary": "57871",
"dob": "1986-08-21"
},
{
"name": "Bob",
"salary": "48974",
"dob": "1990-11-04"
}
El objetivo sería actualizar una colección enorme como la anterior para
{
"name": "Alice",
"salary": 57871,
"dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
"name": "Bob",
"salary": 48974,
"dob": ISODate("1990-11-04T00:00:00.000Z")
}
Para datos relativamente pequeños, se puede lograr lo anterior mediante la iteración de la colección utilizando una snapshot
con el método forEach()
del cursor y actualizando cada documento de la siguiente manera:
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
db.test.updateOne(
{ "_id": doc._id },
{ "$set": { "salary": newSalary, "dob": newDob } }
);
});
Si bien esto es óptimo para colecciones pequeñas, el rendimiento con colecciones grandes se reduce considerablemente, ya que recorrer un gran conjunto de datos y enviar cada operación de actualización por solicitud al servidor conlleva una penalización computacional.
La API Bulk()
viene al rescate y mejora considerablemente el rendimiento, ya que las operaciones de escritura se envían al servidor solo una vez en forma masiva. La eficiencia se logra ya que el método no envía todas las solicitudes de escritura al servidor (como forEach()
con la instrucción de actualización actual dentro del bucle forEach()
) sino solo una vez cada 1000 solicitudes, lo que hace que las actualizaciones sean más eficientes y más rápidas de lo que es actualmente.
Usando el mismo concepto anterior con el bucle forEach()
para crear los lotes, podemos actualizar la colección de forma masiva de la siguiente manera. En esta demostración, la API Bulk()
disponible en las versiones MongoDB >= 2.6
y < 3.2
utiliza el método initializeUnorderedBulkOp()
para ejecutar en paralelo, así como en un orden no determinístico, las operaciones de escritura en los lotes.
Actualiza todos los documentos en la colección de clientes cambiando los campos de salary
y dob
a valores numerical
y de datetime
y datetime
respectivamente:
var bulk = db.test.initializeUnorderedBulkOp(),
counter = 0; // counter to keep track of the batch update size
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulk.find({ "_id": doc._id }).updateOne({
"$set": { "salary": newSalary, "dob": newDob }
});
counter++; // increment counter
if (counter % 1000 == 0) {
bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
bulk = db.test.initializeUnorderedBulkOp();
}
});
El siguiente ejemplo se aplica a la nueva versión 3.2
MongoDB, que desde entonces ha desaprobado la API Bulk()
y ha proporcionado un conjunto más reciente de apis con bulkWrite()
.
Utiliza los mismos cursores que los anteriores, pero crea las matrices con las operaciones masivas utilizando el mismo método de cursor forEach()
para empujar cada documento de escritura masiva a la matriz. Debido a que los comandos de escritura no pueden aceptar más de 1000 operaciones, es necesario agrupar las operaciones para tener como máximo 1000 operaciones y volver a inicializar la matriz cuando el bucle llegue a la iteración 1000:
var cursor = db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}),
bulkUpdateOps = [];
cursor.snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulkUpdateOps.push({
"updateOne": {
"filter": { "_id": doc._id },
"update": { "$set": { "salary": newSalary, "dob": newDob } }
}
});
if (bulkUpdateOps.length === 1000) {
db.test.bulkWrite(bulkUpdateOps);
bulkUpdateOps = [];
}
});
if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }