MongoDB
Operacje masowe
Szukaj…
Uwagi
Konstruowanie listy operacji zapisu do masowego wykonania dla pojedynczej kolekcji.
Konwertowanie pola na inny typ i aktualizacja całej kolekcji luzem
Zwykle ma to miejsce, gdy chce się zmienić typ pola na inny, na przykład w oryginalnej kolekcji mogą być zapisane pola „numeryczne” lub „data” jako ciągi znaków:
{
"name": "Alice",
"salary": "57871",
"dob": "1986-08-21"
},
{
"name": "Bob",
"salary": "48974",
"dob": "1990-11-04"
}
Celem byłoby zaktualizowanie tak ogromnej kolekcji jak wyżej
{
"name": "Alice",
"salary": 57871,
"dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
"name": "Bob",
"salary": 48974,
"dob": ISODate("1990-11-04T00:00:00.000Z")
}
W przypadku stosunkowo małych danych można to osiągnąć, iterując kolekcję za pomocą snapshot
metodą forEach()
kursora i aktualizując każdy dokument w następujący sposób:
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
db.test.updateOne(
{ "_id": doc._id },
{ "$set": { "salary": newSalary, "dob": newDob } }
);
});
Chociaż jest to optymalne w przypadku małych kolekcji, wydajność dużych kolekcji jest znacznie zmniejszona, ponieważ zapętlanie dużego zbioru danych i wysyłanie każdej operacji aktualizacji na żądanie do serwera powoduje karę obliczeniową.
Na pomoc przychodzi interfejs API Bulk()
który znacznie poprawia wydajność, ponieważ operacje zapisu są wysyłane na serwer tylko raz masowo. Wydajność została osiągnięta, ponieważ metoda nie wysyła każdego żądania zapisu do serwera (jak w przypadku bieżącej instrukcji aktualizacji w pętli forEach()
), ale tylko raz na każde 1000 żądań, dzięki czemu aktualizacje są wydajniejsze i szybsze niż obecnie.
Korzystając z tej samej koncepcji powyżej w pętli forEach()
do tworzenia partii, możemy zbiorczo zaktualizować kolekcję w następujący sposób. W tej demonstracji interfejs API Bulk()
dostępny w wersjach MongoDB >= 2.6
i < 3.2
używa metody initializeUnorderedBulkOp()
do równoległego wykonywania, a także w niedeterministycznej kolejności, operacji zapisu w partiach.
Aktualizuje wszystkie dokumenty w kolekcji klientów poprzez zmianę salary
i dob
pola do numerical
i datetime
wartości odpowiednio:
var bulk = db.test.initializeUnorderedBulkOp(),
counter = 0; // counter to keep track of the batch update size
db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulk.find({ "_id": doc._id }).updateOne({
"$set": { "salary": newSalary, "dob": newDob }
});
counter++; // increment counter
if (counter % 1000 == 0) {
bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
bulk = db.test.initializeUnorderedBulkOp();
}
});
Następny przykład dotyczy nowej wersji MongoDB w wersji 3.2
która odtąd przestała bulkWrite()
API Bulk()
i udostępniła nowszy zestaw apis za pomocą bulkWrite()
.
Używa tych samych kursorów jak powyżej, ale tworzy tablice z operacjami zbiorczymi przy użyciu tej samej metody kursora forEach()
celu wypchnięcia każdego dokumentu zapisu zbiorczego do tablicy. Ponieważ polecenia zapisu akceptują nie więcej niż 1000 operacji, konieczne jest grupowanie operacji, aby mieć maksymalnie 1000 operacji i ponowne zainicjowanie tablicy, gdy pętla osiągnie iterację 1000:
var cursor = db.test.find({
"salary": { "$exists": true, "$type": 2 },
"dob": { "$exists": true, "$type": 2 }
}),
bulkUpdateOps = [];
cursor.snapshot().forEach(function(doc){
var newSalary = parseInt(doc.salary),
newDob = new ISODate(doc.dob);
bulkUpdateOps.push({
"updateOne": {
"filter": { "_id": doc._id },
"update": { "$set": { "salary": newSalary, "dob": newDob } }
}
});
if (bulkUpdateOps.length === 1000) {
db.test.bulkWrite(bulkUpdateOps);
bulkUpdateOps = [];
}
});
if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }