Поиск…


параметры

параметр Определение
Считываемый поток тип потока, где данные могут быть считаны из
Считываемый поток тип потока, в котором данные могут быть записаны в
Дуплексный поток тип потока, который доступен как для чтения, так и для записи
Преобразовать поток тип дуплексного потока, который может преобразовывать данные по мере их чтения, а затем записывать

Чтение данных из TextFile с потоками

I / O в узле является асинхронным, поэтому взаимодействие с диском и сетью связано с передачей обратных вызовов к функциям. Возможно, у вас возникнет соблазн написать код, который обслуживает файл с диска следующим образом:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

Этот код работает, но он громоздкий и буферизирует весь файл data.txt в памяти для каждого запроса, прежде чем записывать результат обратно клиентам. Если data.txt очень большой, ваша программа может начать есть много памяти, поскольку она обслуживает множество пользователей одновременно, особенно для пользователей с медленными соединениями.

У пользователей плохой опыт, потому что пользователям придется ждать, пока весь файл будет буферизован в память на вашем сервере, прежде чем они смогут начать получать какое-либо содержимое.

К счастью, оба аргумента (req, res) - это потоки, что означает, что мы можем записать это гораздо лучше, используя fs.createReadStream () вместо fs.readFile ():

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

Здесь .pipe () заботится о прослушивании событий «data» и «end» из fs.createReadStream (). Этот код не только более чистый, но теперь файл data.txt будет записываться клиентам на один фрагмент одновременно сразу же после их получения с диска.

Трубопроводы

Считываемые потоки могут быть «подключены по каналам» или подключены к записываемым потокам. Это делает поток данных из потока источника в поток назначения без особых усилий.

var fs = require('fs')

var readable = fs.createReadStream('file1.txt')
var writable = fs.createWriteStream('file2.txt')

readable.pipe(writable) // returns writable

Когда записываемые потоки также являются читаемыми потоками, т. Е. Когда они являются дуплексными потоками, вы можете продолжать передавать их другим записываемым потокам.

var zlib = require('zlib')

fs.createReadStream('style.css')
  .pipe(zlib.createGzip()) // The returned object, zlib.Gzip, is a duplex stream.
  .pipe(fs.createWriteStream('style.css.gz')

Считываемые потоки также могут быть переданы в несколько потоков.

var readable = fs.createReadStream('source.css')
readable.pipe(zlib.createGzip()).pipe(fs.createWriteStream('output.css.gz'))
readable.pipe(fs.createWriteStream('output.css')

Обратите внимание, что перед любыми потоками данных вы должны синхронно (в то же время) подключаться к выходным потокам. Несоблюдение этого требования может привести к потоку неполных данных.

Также обратите внимание, что объекты потока могут вызывать error ; обязательно ответственно обрабатывайте эти события на каждом потоке, если необходимо:

var readable = fs.createReadStream('file3.txt')
var writable = fs.createWriteStream('file4.txt')
readable.pipe(writable)
readable.on('error', console.error)
writable.on('error', console.error)

Создание собственного читаемого / записываемого потока

Мы увидим, что объекты потока возвращаются модулями типа fs и т. Д., Но что, если мы хотим создать собственный поточный объект.

Чтобы создать объект Stream, нам нужно использовать модуль потока, предоставляемый NodeJs

    var fs = require("fs");
    var stream = require("stream").Writable;
    
    /* 
     *  Implementing the write function in writable stream class.
     *  This is the function which will be used when other stream is piped into this 
     *  writable stream.
     */
    stream.prototype._write = function(chunk, data){
        console.log(data);
    }
    
    var customStream = new stream();
    
    fs.createReadStream("am1.js").pipe(customStream);

Это даст нам собственный пользовательский поток, доступный для записи. мы можем реализовать что-либо в функции _write . Выше метод работает в версии NodeJs 4.xx, но в NodeJs 6.x ES6 введены классы, поэтому синтаксис изменился. Ниже приведен код для версии 6.x NodeJs

    const Writable = require('stream').Writable;
    
    class MyWritable extends Writable {
      constructor(options) {
        super(options);
      }
    
      _write(chunk, encoding, callback) {
        console.log(chunk);
      }
    }

Почему потоки?

Давайте рассмотрим следующие два примера для чтения содержимого файла:

Первый, который использует метод async для чтения файла и предоставляет функцию обратного вызова, которая вызывается после полного чтения файла в памяти:

fs.readFile(`${__dirname}/utils.js`, (err, data) => {
  if (err) {
    handleError(err);
  } else {
    console.log(data.toString());
  }
})

А вторая, которая использует streams для чтения содержимого файла, по частям:

var fileStream = fs.createReadStream(`${__dirname}/file`);
var fileContent = '';
fileStream.on('data', data => {
  fileContent += data.toString();
})

fileStream.on('end', () => {
  console.log(fileContent);
})

fileStream.on('error', err => {
  handleError(err)
})

Стоит отметить, что оба примера делают то же самое . В чем же тогда разница?

  • Первый из них короче и выглядит более элегантно
  • Второй позволяет вам обрабатывать файл во время его чтения (!)

Когда файлы, с которыми вы имеете дело, малы, тогда нет реального эффекта при использовании streams , но что происходит, когда файл большой? (настолько большой, что требуется 10 секунд, чтобы прочитать его в памяти)

Без streams вы будете ждать, ничего не делая абсолютно ничего (если только ваш процесс не делает других вещей), до тех пор, пока не пройдет 10 секунд, и файл не будет полностью прочитан , и только тогда вы сможете начать обработку файла.

С streams вы получаете содержимое файла по частям, прямо, когда они доступны, - и это позволяет обрабатывать файл во время его чтения.


В приведенном выше примере не показано, как streams могут использоваться для работы, которые невозможно выполнить при переходе на режим обратного вызова, поэтому рассмотрим другой пример:

Я хотел бы загрузить gzip файл, разархивировать его и сохранить его содержимое на диск. Учитывая url файла, это то, что нужно сделать:

  • Загрузите файл
  • Разархивируйте файл
  • Сохранить на диске

Вот [маленький файл] [1], который хранится в моем хранилище S3 . Следующий код делает это в обратном вызове.

var startTime = Date.now()
s3.getObject({Bucket: 'some-bucket', Key: 'tweets.gz'}, (err, data) => {
  // here, the whole file was downloaded

  zlib.gunzip(data.Body, (err, data) => {
    // here, the whole file was unzipped

    fs.writeFile(`${__dirname}/tweets.json`, data, err => {
      if (err) console.error(err)

      // here, the whole file was written to disk
      var endTime = Date.now()
      console.log(`${endTime - startTime} milliseconds`) // 1339 milliseconds
    })
  })
})

// 1339 milliseconds

Вот как это выглядит, используя streams :

s3.getObject({Bucket: 'some-bucket', Key: 'tweets.gz'}).createReadStream()
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream(`${__dirname}/tweets.json`));

// 1204 milliseconds

Да, это не быстрее при работе с небольшими файлами - проверенный вес файлов 80KB . Тестирование этого файла в более 71MB файле, 71MB gzipped ( 382MB распаковкой), показывает, что версия streams намного быстрее

  • Потребовалось 20925 миллисекунд, чтобы загрузить 71MB , разархивировать его, а затем записать 382MB на диск - используя метод обратного вызова .
  • Для сравнения потребовалось 13434 миллисекунды, чтобы сделать то же самое при использовании версии streams (на 35% быстрее, для не очень большого файла)


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow