使用 WebSocket 和 Node.JS Stream 构建 HTTP 隧道

在开发与第三方服务集成的应用程序或机器人时,通常需要将本地开发服务器暴露在 Internet 上以接收 Webhook 消息。要实现这一点,需要为本地服务器创建 HTTP 隧道。本文演示了如何使用 WebSocket 和 Node.js 流构建HTTP隧道工具并传输大数据。

为什么要部署自己的HTTP隧道服务

许多在线服务提供 HTTP 隧道,例如 ngrok,它提供付费的固定公共域来连接本地服务器。它还提供免费的套餐,但仅提供随机域,每次客户端重新启动时都会更改,使得在第三方服务中保存域名不方便。

要获得固定域,您可以在自己的服务器上部署HTTP隧道。ngrok还提供了开源版本用于服务器端部署,但它是一个旧版本1.x,存在一些严重的可靠性问题,不建议用于生产。

此外,使用自己的服务器,您可以确保数据安全。

Lite HTTP Tunnel项目简介

Lite HTTP Tunnel 是一项最近开发的HTTP隧道服务,可以自行托管。您可以使用 Github 存储库中的Deploy按钮部署它并免费获取固定域。

它基于 Express.js 和 Socket.io 构建,仅使用几行代码。它使用 WebSocket 将 HTTP / HTTPS 请求从公共服务器流式传输到本地服务器。

实现

步骤1:在服务器和客户端之间建立 WebSocket 连接

要在服务器端支持 WebSocket 连接,我们使用 socket.io:

const http = require('http');
const express = require('express');
const { Server } = require('socket.io');

const app = express();
const httpServer = http.createServer(app);
const io = new Server(httpServer);

let connectedSocket = null;

io.on('connection', (socket) => {
  console.log('client connected');
  connectedSocket = socket;
  const onMessage = (message) => {
    if (message === 'ping') {
      socket.send('pong');
    }
  }
  const onDisconnect = (reason) => {
    console.log('client disconnected: ', reason);
    connectedSocket = null;
    socket.off('message', onMessage);
    socket.off('error', onError);
  };
  const onError = (e) => {
    connectedSocket = null;
    socket.off('message', onMessage);
    socket.off('disconnect', onDisconnect);
  };
  socket.on('message', onMessage);
  socket.once('disconnect', onDisconnect);
  socket.once('error', onError);
});

httpServer.listen(process.env.PORT);

要在客户端连接 WebSocket:

const { io } = require('socket.io-client');

let socket = null;

function initClient(options) {
  socket = io(options.server, {
    transports: ["websocket"],
    auth: {
      token: options.jwtToken,
    },
  });

  socket.on('connect', () => {
    if (socket.connected) {
      console.log('client connect to server successfully');
    }
  });

  socket.on('connect_error', (e) => {
    console.log('connect error', e && e.message);
  });

  socket.on('disconnect', () => {
    console.log('client disconnected');
  });
}

第二步:使用 JWT Token 保护 WebSocket 连接

在服务器端,我们使用 socket.io 中间件拒绝无效连接:

const jwt = require('jsonwebtoken');

io.use((socket, next) => {
  if (connectedSocket) {
    return next(new Error('Connected error'));
  }
  if (!socket.handshake.auth || !socket.handshake.auth.token){
    next(new Error('Authentication error'));
  }
  jwt.verify(socket.handshake.auth.token, process.env.SECRET_KEY, function(err, decoded) {
    if (err) {
      return next(new Error('Authentication error'));
    }
    if (decoded.token !== process.env.VERIFY_TOKEN) {
      return next(new Error('Authentication error'));
    }
    next();
  });  
});

第 3 步:将请求从服务器传输到客户端

要将请求数据从服务器发送到客户端,我们使用可写流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Writable 类。

const { Writable } = require('stream');

class SocketRequest extends Writable {
  constructor({ socket, requestId, request }) {
    super();
    this._socket = socket;
    this._requestId = requestId;
    this._socket.emit('request', requestId, request);
  }

  _write(chunk, encoding, callback) {
    this._socket.emit('request-pipe', this._requestId, chunk);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _writev(chunks, callback) {
    this._socket.emit('request-pipes', this._requestId, chunks);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _final(callback) {
    this._socket.emit('request-pipe-end', this._requestId);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _destroy(e, callback) {
    if (e) {
      this._socket.emit('request-pipe-error', this._requestId, e && e.message);
      this._socket.conn.once('drain', () => {
        callback();
      });
      return;
    }
    callback();
  }
}

app.use('/', (req, res) => {
  if (!connectedSocket) {
    res.status(404);
    res.send('Not Found');
    return;
  }
  const requestId = uuidV4();
  const socketRequest = new SocketRequest({
    socket: connectedSocket,
    requestId,
    request: {
      method: req.method,
      headers: { ...req.headers },
      path: req.url,
    },
  });
  const onReqError = (e) => {
    socketRequest.destroy(new Error(e || 'Aborted'));
  }
  req.once('aborted', onReqError);
  req.once('error', onReqError);
  req.pipe(socketRequest);
  req.once('finish', () => {
    req.off('aborted', onReqError);
    req.off('error', onReqError);
  });
  // ...
});

要在客户端接收请求数据,我们使用可读流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Readable 类。

onst stream = require('stream');

class SocketRequest extends stream.Readable {
  constructor({ socket, requestId }) {
    super();
    this._socket = socket;
    this._requestId = requestId;
    const onRequestPipe = (requestId, data) => {
      if (this._requestId === requestId) {
        this.push(data);
      }
    };
    const onRequestPipes = (requestId, data) => {
      if (this._requestId === requestId) {
        data.forEach((chunk) => {
          this.push(chunk);
        });
      }
    };
    const onRequestPipeError = (requestId, error) => {
      if (this._requestId === requestId) {
        this._socket.off('request-pipe', onRequestPipe);
        this._socket.off('request-pipes', onRequestPipes);
        this._socket.off('request-pipe-error', onRequestPipeError);
        this._socket.off('request-pipe-end', onRequestPipeEnd);
        this.destroy(new Error(error));
      }
    };
    const onRequestPipeEnd = (requestId, data) => {
      if (this._requestId === requestId) {
        this._socket.off('request-pipe', onRequestPipe);
        this._socket.off('request-pipes', onRequestPipes);
        this._socket.off('request-pipe-error', onRequestPipeError);
        this._socket.off('request-pipe-end', onRequestPipeEnd);
        if (data) {
          this.push(data);
        }
        this.push(null);
      }
    };
    this._socket.on('request-pipe', onRequestPipe);
    this._socket.on('request-pipes', onRequestPipes);
    this._socket.on('request-pipe-error', onRequestPipeError);
    this._socket.on('request-pipe-end', onRequestPipeEnd);
  }

  _read() {}
}

socket.on('request', (requestId, request) => {
  console.log(`${request.method}: `, request.path);
  request.port = options.port;
  request.hostname = options.host;
  const socketRequest = new SocketRequest({
    requestId,
    socket: socket,
  });
  const localReq = http.request(request);
  socketRequest.pipe(localReq);
  const onSocketRequestError = (e) => {
    socketRequest.off('end', onSocketRequestEnd);
    localReq.destroy(e);
  };
  const onSocketRequestEnd = () => {
    socketRequest.off('error', onSocketRequestError);
  };
  socketRequest.once('error', onSocketRequestError);
  socketRequest.once('end', onSocketRequestEnd);
  // ...
});

第四步:从客户端传输响应到服务器

为将响应数据发送到隧道服务器,我们将使用流模块创建可写 stream

const stream = require('stream');

class SocketResponse extends stream.Writable {
  constructor({ socket, responseId }) {
    super();
    this._socket = socket;
    this._responseId = responseId;
  }

  _write(chunk, encoding, callback) {
    this._socket.emit('response-pipe', this._responseId, chunk);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _writev(chunks, callback) {
    this._socket.emit('response-pipes', this._responseId, chunks);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _final(callback) {
    this._socket.emit('response-pipe-end', this._responseId);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _destroy(e, callback) {
    if (e) {
      this._socket.emit('response-pipe-error', this._responseId, e && e.message);
      this._socket.io.engine.once('drain', () => {
        callback();
      });
      return;
    }
    callback();
  }

  writeHead(statusCode, statusMessage, headers) {
    this._socket.emit('response', this._responseId, {
      statusCode,
      statusMessage,
      headers,
    });
  }
}

socket.on('request', (requestId, request) => {
    // ...stream request and send request to local server...
    const onLocalResponse = (localRes) => {
      localReq.off('error', onLocalError);
      const socketResponse = new SocketResponse({
        responseId: requestId,
        socket: socket,
      });
      socketResponse.writeHead(
        localRes.statusCode,
        localRes.statusMessage,
        localRes.headers
      );
      localRes.pipe(socketResponse);
    };
    const onLocalError = (error) => {
      console.log(error);
      localReq.off('response', onLocalResponse);
      socket.emit('request-error', requestId, error && error.message);
      socketRequest.destroy(error);
    };
    localReq.once('error', onLocalError);
    localReq.once('response', onLocalResponse);
  });

为了在隧道服务器中获取响应数据,我们将创建一个可读流。

class SocketResponse extends Readable {
  constructor({ socket, responseId }) {
    super();
    this._socket = socket;
    this._responseId = responseId;
    const onResponse = (responseId, data) => {
      if (this._responseId === responseId) {
        this._socket.off('response', onResponse);
        this._socket.off('request-error', onRequestError);
        this.emit('response', data.statusCode, data.statusMessage, data.headers);
      }
    }
    const onResponsePipe = (responseId, data) => {
      if (this._responseId === responseId) {
        this.push(data);
      }
    };
    const onResponsePipes = (responseId, data) => {
      if (this._responseId === responseId) {
        data.forEach((chunk) => {
          this.push(chunk);
        });
      }
    };
    const onResponsePipeError = (responseId, error) => {
      if (this._responseId !== responseId) {
        return;
      }
      this._socket.off('response-pipe', onResponsePipe);
      this._socket.off('response-pipes', onResponsePipes);
      this._socket.off('response-pipe-error', onResponsePipeError);
      this._socket.off('response-pipe-end', onResponsePipeEnd);
      this.destroy(new Error(error));
    };
    const onResponsePipeEnd = (responseId, data) => {
      if (this._responseId !== responseId) {
        return;
      }
      if (data) {
        this.push(data);
      }
      this._socket.off('response-pipe', onResponsePipe);
      this._socket.off('response-pipes', onResponsePipes);
      this._socket.off('response-pipe-error', onResponsePipeError);
      this._socket.off('response-pipe-end', onResponsePipeEnd);
      this.push(null);
    };
    const onRequestError = (requestId, error) => {
      if (requestId === this._responseId) {
        this._socket.off('request-error', onRequestError);
        this._socket.off('response', onResponse);
        this._socket.off('response-pipe', onResponsePipe);
        this._socket.off('response-pipes', onResponsePipes);
        this._socket.off('response-pipe-error', onResponsePipeError);
        this._socket.off('response-pipe-end', onResponsePipeEnd);
        this.emit('requestError', error);
      }
    };
    this._socket.on('response', onResponse);
    this._socket.on('response-pipe', onResponsePipe);
    this._socket.on('response-pipes', onResponsePipes);
    this._socket.on('response-pipe-error', onResponsePipeError);
    this._socket.on('response-pipe-end', onResponsePipeEnd);
    this._socket.on('request-error', onRequestError);
  }

  _read(size) {}
}

app.use('/', (req, res) => {
  // ... stream request to tunnel client
  const onResponse = (statusCode, statusMessage, headers) => {
    socketRequest.off('requestError', onRequestError)
    res.writeHead(statusCode, statusMessage, headers);
  };
  socketResponse.once('requestError', onRequestError)
  socketResponse.once('response', onResponse);
  socketResponse.pipe(res);
  const onSocketError = () => {
    res.end(500);
  };
  socketResponse.once('error', onSocketError);
  connectedSocket.once('close', onSocketError)
  res.once('close', () => {
    connectedSocket.off('close', onSocketError);
    socketResponse.off('error', onSocketError);
  });
});

在完成以上所有步骤后,我们已经支持将 HTTP 请求流式传输到本地计算机,并从本地服务器发送响应到原始请求。这是一种轻量级的解决方案,但稳定性高且易于部署在任何 “Node.js” 环境中。

第六步:部署HTTP隧道服务

我们可以将HTTP隧道服务部署到云提供商(如 Heroku)。项目 Lite HTTP Tunnel 在Github存储库中包含一个 Heroku/Render 按钮,可让您快速将服务部署到 Heroku/Render。

更多信息

在本文中,我们学习了如何基于 WebSocket 和 Node.js 流构建HTTP隧道工具。使用此工具,我们可以将本地开发服务器暴露到 Internet,并从第三方服务接收 Webhook 消息。我们还了解了如何使用 Node.js 流在客户端和服务器之间传输大量数据。

英文文章:https://dev.to/embbnux/building-a-http-tunnel-with-websocket-and-nodejs-4bp5

基于 github tag 与 travis 构建 npm 自动化 release 系统

最近发了个 npm package (koa-flash-message) 用于 kails 的 flash message 管理,发现手动 publish 还是比较麻烦了,所以决定利用 github tag 和 travis ci 来构建基于 tag 的自动化 publish 系统。
继续阅读基于 github tag 与 travis 构建 npm 自动化 release 系统

前后端分离-利用 koa 实现基于 node.js 的 web 高性能中间层

之前讲了利用 koa 实现了类似 rails 的 web 开发 mvc 项目 kails。 但是在今天的 web 开发中,大家都讲究前后端分离以及微服务,传统的 MVC 已经不能满足很多人胃口了。 所以今天再讲下如何利用 koa 构建高性能的 web 中间层,实现大前端概念下的前后端分离。

继续阅读前后端分离-利用 koa 实现基于 node.js 的 web 高性能中间层

使用 Travis CI 为开源项目 kails 构建自动化 CI 系统

Kails 开源后得到许多的朋友的关注,也就希望把 kails 做成比较标准的开源项目。想到的首先是给项目加测试代码,然后就是利用 CI 工具来自动跑测试,实现 自动化测试每一个新功能。由于是开源项目所以可以直接使用 travis ci, 它对开源项目是免费的,而由于它是基于 docker 来配置测试环境,非常的简单省心。

继续阅读使用 Travis CI 为开源项目 kails 构建自动化 CI 系统

docker容器内网络请求缓慢问题解决

在使用docker的过程中发现了几个问题,在docker里进行的网络请求经常会失败,比如npm install以及bundle install等操作,或者是作为中间层在应用中去获取api数据的过程经常会出现timeout等情况,所以开始探究docker的网络机制,以解决网络请求太慢的问题。
继续阅读docker容器内网络请求缓慢问题解决

[Kails]一个基于Koa2构建的类似于Rails的nodejs开源项目

最近研究了下 Koa2 框架,喜爱其中间件的思想。但是发现实在是太简洁了,只有基本功能,虽然可以方便搭各种服务,但是离可以适应快速开发的网站框架还是有点距离。于是参考 Rails 的大致框架搭建了个网站框架 kails, 配合 postgres 和 redis, 实现了 MVC 架构,前端 webpack,react前后端同构等网站开发基本框架。本文主要介绍 kails 搭建中的各种技术栈和思想。
继续阅读[Kails]一个基于Koa2构建的类似于Rails的nodejs开源项目

Rails使用负载均衡后获取用户ip错误问题解决

网站的流量越来越大后开始使用负载均衡来提高网站的并发数,负载均衡有很多选择,可以使用现成的slb产品,也可以使用nginx进行代理转发流量,使用后发现一个问题,服务器上获取的用户ip变成负载均衡机器的ip了,这里记录一下这个问题的解决。
继续阅读Rails使用负载均衡后获取用户ip错误问题解决

基于docker安装jenkins的配置和使用

最近更换CI,多方研究选上jenkins作为新的CI,这里简要介绍使用docker搭建配置jenkins的流程,以及遇到的一些坑。直接利用docker镜像跑jenkins,不仅简化了jenkins的安装和配置,而且再也不用担心换机器还要重复配置半天了。
继续阅读基于docker安装jenkins的配置和使用

重构-基于Rails:Engine的Rails网站系统拆分

网站做着做着就会发现整个系统越来越庞大,如电商网站你需要有电商系统和后台管理系统,甚至还有仓储系统或者API系统,把这些系统都放在一个网站应用里就会发现网站代码异常庞大,而且每个系统的更新频率会被限制成一样,后台系统无法实现快速开发,所以也就想到了网站的系统服务。现在比较流行网站系统拆分是微服务架构,但是这是比较复杂的拆分,一般网站还到达不了这个地步,这篇介绍基于Rails:Engine的Rails网站系统拆分。

继续阅读重构-基于Rails:Engine的Rails网站系统拆分

使用docker快速构建rails开发环境

发现最近国内获取Docker镜像速度明显上升,所以开始正式的玩Docker了,这篇文章介绍使用Docker来快速构建Rails开发环境,系统环境为MAC OSX, Rails依赖于Redis以及Postgresql.所以主要包括Rails以及Redis和Postgres三大组件。 使用的Docker工具为docker-compose.

继续阅读使用docker快速构建rails开发环境