Skip to content

Commit

Permalink
Fix for close() (serialport#150), support flow control (serialport#203).
Browse files Browse the repository at this point in the history
Flow control options haven't been added to windows yet.
  • Loading branch information
giseburt committed Aug 19, 2013
1 parent 6c58e5b commit 512bc80
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 86 deletions.
2 changes: 1 addition & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
'target_name': 'serialport',
'sources': [
'src/serialport.cpp',
'src/serialport_unix.cpp',
],
'conditions': [
['OS=="win"',
Expand All @@ -20,6 +19,7 @@
{
'sources': [
'src/serialport_unix.cpp',
'src/serialport_poller.cpp',
],
}
],
Expand Down
179 changes: 165 additions & 14 deletions serialport.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

var Buffer = require('buffer').Buffer;
var SerialPortBinding = require("bindings")("serialport.node");
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var fs = require('fs');
var stream = require('stream');
var path = require('path');
var async = require('async');
var child_process = require('child_process');


// Removing check for valid BaudRates due to ticket: #140
// var BAUDRATES = [500000, 230400, 115200, 57600, 38400, 19200, 9600, 4800, 2400, 1800, 1200, 600, 300, 200, 150, 134, 110, 75, 50];

Expand All @@ -23,6 +25,16 @@ var PARITY = ['none', 'even', 'mark', 'odd', 'space'];
var FLOWCONTROLS = ["XON", "XOFF", "XANY", "RTSCTS"];


// Stuff from ReadStream, refactored for our usage:
var kPoolSize = 40 * 1024;
var kMinPoolSpace = 128;
var pool;

function allocNewPool() {
pool = new Buffer(kPoolSize);
pool.used = 0;
}


var parsers = {
raw: function (emitter, buffer) {
Expand Down Expand Up @@ -80,7 +92,7 @@ function SerialPort (path, options, openImmediately) {
throw new Error('Invalid "databits": ' + options.dataBits);
}

options.stopbits = options.stopBits || options.stopbits || _options.stopbits;
options.stopBits = options.stopBits || options.stopbits || _options.stopbits;
if (STOPBITS.indexOf(options.stopBits) == -1) {
throw new Error('Invalid "stopbits": ' + options.stopbits);
}
Expand Down Expand Up @@ -130,19 +142,35 @@ function SerialPort (path, options, openImmediately) {
options.dataCallback = function (data) {
options.parser(self, data);
};

// options.dataReadyCallback = function () {
// self.readStream._read(4024);
// };

options.errorCallback = function (err) {
// console.log("err:", JSON.stringify(err));
self.emit('error', err);
};
options.disconnectedCallback = function () {
if (self.closing) {
return;
}
self.emit('error', new Error("Disconnected"));
self.close();
// self.close();
};

if (process.platform == 'win32') {
path = '\\\\.\\' + path;
} else {
// All other platforms:
this.fd = null;
this.paused = true;
this.bufferSize = options.bufferSize || 64 * 1024;
this.readable = true;
this.reading = false;

if (options.encoding)
this.setEncoding(this.encoding);
}

this.options = options;
Expand All @@ -167,15 +195,20 @@ SerialPort.prototype.open = function (callback) {
return;
}
if (process.platform !== 'win32') {
self.readStream = fs.createReadStream(self.path, { bufferSize: self.options.bufferSize, fd: fd });
self.readStream.on("data", self.options.dataCallback);
self.readStream.on("error", self.options.errorCallback);
self.readStream.on("close", function () {
self.close();
});
self.readStream.on("end", function () {
self.emit('end');
});
// self.readStream = new SerialStream(self.fd, { bufferSize: self.options.bufferSize });
// self.readStream.on("data", self.options.dataCallback);
// self.readStream.on("error", self.options.errorCallback);
// self.readStream.on("close", function () {
// self.close();
// });
// self.readStream.on("end", function () {
// console.log(">>END");
// self.emit('end');
// });
// self.readStream.resume();
self.paused = false;
self.serialPoller = new SerialPortBinding.SerialportPoller(self.fd, function() {self._read();});
self.serialPoller.start();
}

self.emit('open');
Expand Down Expand Up @@ -206,11 +239,120 @@ SerialPort.prototype.write = function (buffer, callback) {
});
};

if (process.platform !== 'win32') {
SerialPort.prototype._read = function() {
var self = this;

console.log(">>READ");
if (!self.readable || self.paused || self.reading) return;

self.reading = true;

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
// users might have refernces to slices on it.
pool = null;
allocNewPool();
}

// Grab another reference to the pool in the case that while we're in the
// thread pool another read() finishes up the pool, and allocates a new
// one.
var thisPool = pool;
var toRead = Math.min(pool.length - pool.used, ~~self.bufferSize);
var start = pool.used;

function afterRead(err, bytesRead, readPool, bytesRequested) {
self.reading = false;
if (err) {
if (err.code && err.code == 'EAGAIN') {
if (self.fd >= 0)
self.serialPoller.start();
} else {
self.fd = null;
self.options.errorCallback(err);
self.readable = false;
return;
}
}

// Since we will often not read the number of bytes requested,
// let's mark the ones we didn't need as available again.
pool.used -= bytesRequested - bytesRead;

console.log(">>ACTUALLY READ: ", bytesRead);

if (bytesRead === 0) {
if (self.fd >= 0)
self.serialPoller.start();
} else {
var b = thisPool.slice(start, start + bytesRead);

// do not emit events if the stream is paused
if (self.paused) {
self.buffer = Buffer.concat([self.buffer, b]);
return;
} else {
self._emitData(b);
}

// do not emit events anymore after we declared the stream unreadable
if (!self.readable) return;

self._read();
}
}

console.log(">>REQUEST READ: ", toRead);
fs.read(self.fd, pool, pool.used, toRead, self.pos, function(err, bytesRead){
var readPool = pool;
var bytesRequested = toRead;
afterRead(err, bytesRead, readPool, bytesRequested);}
);

pool.used += toRead;
};


SerialPort.prototype._emitData = function(d) {
var self = this;
// if (self._decoder) {
// var string = self._decoder.write(d);
// if (string.length) self.options.dataCallback(string);
// } else {
self.options.dataCallback(d);
// }
};

SerialPort.prototype.pause = function() {
var self = this;
self.paused = true;
};


SerialPort.prototype.resume = function() {
var self = this;
self.paused = false;

if (self.buffer) {
var buffer = self.buffer;
self.buffer = null;
self._emitData(buffer);
}

// No longer open?
if (null == self.fd)
return;

self._read();
};

} // if !'win32'

SerialPort.prototype.close = function (callback) {
var self = this;

var fd = this.fd;
this.fd = 0;
var fd = self.fd;

if (self.closing) {
return;
Expand All @@ -226,6 +368,9 @@ SerialPort.prototype.close = function (callback) {
self.closing = true;
try {
if (self.readStream) {
// Make sure we clear the readStream's fd, or it'll try to close() it.
// We already close()d it.
self.readStream.fd = null;
self.readStream.destroy();
}

Expand All @@ -239,6 +384,12 @@ SerialPort.prototype.close = function (callback) {
self.emit('close');
self.removeAllListeners();
self.closing = false;
self.fd = 0;

if (process.platform !== 'win32') {
self.readable = false;
self.serialPoller.close();
}
});
} catch (ex) {
self.closing = false;
Expand Down Expand Up @@ -315,7 +466,7 @@ if (process.platform === 'win32') {

SerialPort.prototype.flush = function (callback) {
var self = this;
var fd = this.fd;
var fd = self.fd;

if (!fd) {
if (callback) {
Expand Down
6 changes: 6 additions & 0 deletions src/serialport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#ifdef WIN32
#define strcasecmp stricmp
#else
#include "serialport_poller.h"
#endif

uv_mutex_t write_queue_mutex;
Expand Down Expand Up @@ -341,6 +343,10 @@ extern "C" {
NODE_SET_METHOD(target, "close", Close);
NODE_SET_METHOD(target, "list", List);
NODE_SET_METHOD(target, "flush", Flush);

#ifndef WIN32
SerialportPoller::Init(target);
#endif
}
}

Expand Down
Loading

0 comments on commit 512bc80

Please sign in to comment.