MongoDB
Massenvorgänge
Suche…
Bemerkungen
Erstellen einer Liste von Schreibvorgängen, die für eine einzelne Sammlung in Massen ausgeführt werden sollen.
Konvertieren eines Felds in einen anderen Typ und Aktualisieren der gesamten Sammlung in Massen
In der Regel ist es der Fall, wenn ein Feldtyp in einen anderen geändert werden soll. In der Originalsammlung können beispielsweise "numerische" oder "Datums" -Felder als Zeichenfolgen gespeichert sein:
{
"name": "Alice",
"salary": "57871",
"dob": "1986-08-21"
},
{
"name": "Bob",
"salary": "48974",
"dob": "1990-11-04"
}
Das Ziel wäre, eine riesige Sammlung wie die oben genannte zu aktualisieren
{
"name": "Alice",
"salary": 57871,
"dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
"name": "Bob",
"salary": 48974,
"dob": ISODate("1990-11-04T00:00:00.000Z")
}
Bei relativ kleinen Daten können Sie dies erreichen, indem Sie die Sammlung mithilfe einer snapshot
mit der forEach()
Methode des Cursors iterieren und jedes Dokument wie folgt aktualisieren:
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
db.test.updateOne(
{ "_id": doc._id },
{ "$set": { "salary": newSalary, "dob": newDob } }
);
});
Während dies für kleine Sammlungen optimal ist, wird die Leistung bei großen Sammlungen erheblich reduziert, da das Durchlaufen einer großen Datenmenge und das Senden jedes Aktualisierungsvorgangs pro Anforderung an den Server eine Rechenstrafe nach sich zieht.
Die Bulk()
API hilft dabei und verbessert die Leistung erheblich, da Schreibvorgänge nur einmal an den Server gesendet werden. Die Effizienz wird erreicht, da die Methode nicht jede Schreibanforderung an den Server sendet (wie bei der aktuellen Aktualisierungsanweisung innerhalb der forEach()
Schleife), sondern nur einmal pro 1000 Anfragen, wodurch Aktualisierungen effizienter und schneller als derzeit gemacht werden.
Wenn Sie dasselbe Konzept wie bei der forEach()
Schleife verwenden, um die Stapel zu erstellen, können Sie die Sammlung wie folgt in großen Mengen aktualisieren. In dieser Demonstration verwendet die in den MongoDB-Versionen >= 2.6
und < 3.2
verfügbare Bulk()
API die Methode initializeUnorderedBulkOp()
, um die Schreibvorgänge in den Batches parallel und in nicht-deterministischer Reihenfolge auszuführen.
Es aktualisiert alle Dokumente in der dob
indem die salary
und dob
Felder in numerical
bzw. datetime
Werte datetime
werden:
var bulk = db.test.initializeUnorderedBulkOp(),
counter = 0; // counter to keep track of the batch update size
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulk.find({ "_id": doc._id }).updateOne({
"$set": { "salary": newSalary, "dob": newDob }
});
counter++; // increment counter
if (counter % 1000 == 0) {
bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
bulk = db.test.initializeUnorderedBulkOp();
}
});
Das nächste Beispiel gilt für die neue MongoDB-Version 3.2
die die Bulk()
API seitdem nicht mehr unterstützt und einen neueren Satz von Apis mithilfe von bulkWrite()
.
Es verwendet die gleichen Cursor wie oben, erstellt jedoch die Arrays mit den Massenvorgängen unter Verwendung der gleichen forEach()
Cursor-Methode, um jedes Massenschreibdokument in das Array zu verschieben. Da Schreibbefehle nicht mehr als 1000 Operationen akzeptieren können, müssen Operationen gruppiert werden, um höchstens 1000 Operationen zu haben und das Array neu zu initialisieren, wenn die Schleife die 1000-Iteration erreicht:
var cursor = db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}),
bulkUpdateOps = [];
cursor.snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulkUpdateOps.push({
"updateOne": {
"filter": { "_id": doc._id },
"update": { "$set": { "salary": newSalary, "dob": newDob } }
}
});
if (bulkUpdateOps.length === 1000) {
db.test.bulkWrite(bulkUpdateOps);
bulkUpdateOps = [];
}
});
if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }