使用 WebSocket 和 Node.JS Stream 构建 HTTP 隧道

在开发与第三方服务集成的应用程序或机器人时,通常需要将本地开发服务器暴露在 Internet 上以接收 Webhook 消息。要实现这一点,需要为本地服务器创建 HTTP 隧道。本文演示了如何使用 WebSocket 和 Node.js 流构建HTTP隧道工具并传输大数据。

为什么要部署自己的HTTP隧道服务

许多在线服务提供 HTTP 隧道,例如 ngrok,它提供付费的固定公共域来连接本地服务器。它还提供免费的套餐,但仅提供随机域,每次客户端重新启动时都会更改,使得在第三方服务中保存域名不方便。

要获得固定域,您可以在自己的服务器上部署HTTP隧道。ngrok还提供了开源版本用于服务器端部署,但它是一个旧版本1.x,存在一些严重的可靠性问题,不建议用于生产。

此外,使用自己的服务器,您可以确保数据安全。

Lite HTTP Tunnel项目简介

Lite HTTP Tunnel 是一项最近开发的HTTP隧道服务,可以自行托管。您可以使用 Github 存储库中的Deploy按钮部署它并免费获取固定域。

它基于 Express.js 和 Socket.io 构建,仅使用几行代码。它使用 WebSocket 将 HTTP / HTTPS 请求从公共服务器流式传输到本地服务器。

实现

步骤1:在服务器和客户端之间建立 WebSocket 连接

要在服务器端支持 WebSocket 连接,我们使用 socket.io:

const http = require('http');
const express = require('express');
const { Server } = require('socket.io');

const app = express();
const httpServer = http.createServer(app);
const io = new Server(httpServer);

let connectedSocket = null;

io.on('connection', (socket) => {
  console.log('client connected');
  connectedSocket = socket;
  const onMessage = (message) => {
    if (message === 'ping') {
      socket.send('pong');
    }
  }
  const onDisconnect = (reason) => {
    console.log('client disconnected: ', reason);
    connectedSocket = null;
    socket.off('message', onMessage);
    socket.off('error', onError);
  };
  const onError = (e) => {
    connectedSocket = null;
    socket.off('message', onMessage);
    socket.off('disconnect', onDisconnect);
  };
  socket.on('message', onMessage);
  socket.once('disconnect', onDisconnect);
  socket.once('error', onError);
});

httpServer.listen(process.env.PORT);

要在客户端连接 WebSocket:

const { io } = require('socket.io-client');

let socket = null;

function initClient(options) {
  socket = io(options.server, {
    transports: ["websocket"],
    auth: {
      token: options.jwtToken,
    },
  });

  socket.on('connect', () => {
    if (socket.connected) {
      console.log('client connect to server successfully');
    }
  });

  socket.on('connect_error', (e) => {
    console.log('connect error', e && e.message);
  });

  socket.on('disconnect', () => {
    console.log('client disconnected');
  });
}

第二步:使用 JWT Token 保护 WebSocket 连接

在服务器端,我们使用 socket.io 中间件拒绝无效连接:

const jwt = require('jsonwebtoken');

io.use((socket, next) => {
  if (connectedSocket) {
    return next(new Error('Connected error'));
  }
  if (!socket.handshake.auth || !socket.handshake.auth.token){
    next(new Error('Authentication error'));
  }
  jwt.verify(socket.handshake.auth.token, process.env.SECRET_KEY, function(err, decoded) {
    if (err) {
      return next(new Error('Authentication error'));
    }
    if (decoded.token !== process.env.VERIFY_TOKEN) {
      return next(new Error('Authentication error'));
    }
    next();
  });  
});

第 3 步:将请求从服务器传输到客户端

要将请求数据从服务器发送到客户端,我们使用可写流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Writable 类。

const { Writable } = require('stream');

class SocketRequest extends Writable {
  constructor({ socket, requestId, request }) {
    super();
    this._socket = socket;
    this._requestId = requestId;
    this._socket.emit('request', requestId, request);
  }

  _write(chunk, encoding, callback) {
    this._socket.emit('request-pipe', this._requestId, chunk);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _writev(chunks, callback) {
    this._socket.emit('request-pipes', this._requestId, chunks);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _final(callback) {
    this._socket.emit('request-pipe-end', this._requestId);
    this._socket.conn.once('drain', () => {
      callback();
    });
  }

  _destroy(e, callback) {
    if (e) {
      this._socket.emit('request-pipe-error', this._requestId, e && e.message);
      this._socket.conn.once('drain', () => {
        callback();
      });
      return;
    }
    callback();
  }
}

app.use('/', (req, res) => {
  if (!connectedSocket) {
    res.status(404);
    res.send('Not Found');
    return;
  }
  const requestId = uuidV4();
  const socketRequest = new SocketRequest({
    socket: connectedSocket,
    requestId,
    request: {
      method: req.method,
      headers: { ...req.headers },
      path: req.url,
    },
  });
  const onReqError = (e) => {
    socketRequest.destroy(new Error(e || 'Aborted'));
  }
  req.once('aborted', onReqError);
  req.once('error', onReqError);
  req.pipe(socketRequest);
  req.once('finish', () => {
    req.off('aborted', onReqError);
    req.off('error', onReqError);
  });
  // ...
});

要在客户端接收请求数据,我们使用可读流。以下代码实现了 SocketRequest 类,该类扩展了Node.js内置 stream 模块中的 Readable 类。

onst stream = require('stream');

class SocketRequest extends stream.Readable {
  constructor({ socket, requestId }) {
    super();
    this._socket = socket;
    this._requestId = requestId;
    const onRequestPipe = (requestId, data) => {
      if (this._requestId === requestId) {
        this.push(data);
      }
    };
    const onRequestPipes = (requestId, data) => {
      if (this._requestId === requestId) {
        data.forEach((chunk) => {
          this.push(chunk);
        });
      }
    };
    const onRequestPipeError = (requestId, error) => {
      if (this._requestId === requestId) {
        this._socket.off('request-pipe', onRequestPipe);
        this._socket.off('request-pipes', onRequestPipes);
        this._socket.off('request-pipe-error', onRequestPipeError);
        this._socket.off('request-pipe-end', onRequestPipeEnd);
        this.destroy(new Error(error));
      }
    };
    const onRequestPipeEnd = (requestId, data) => {
      if (this._requestId === requestId) {
        this._socket.off('request-pipe', onRequestPipe);
        this._socket.off('request-pipes', onRequestPipes);
        this._socket.off('request-pipe-error', onRequestPipeError);
        this._socket.off('request-pipe-end', onRequestPipeEnd);
        if (data) {
          this.push(data);
        }
        this.push(null);
      }
    };
    this._socket.on('request-pipe', onRequestPipe);
    this._socket.on('request-pipes', onRequestPipes);
    this._socket.on('request-pipe-error', onRequestPipeError);
    this._socket.on('request-pipe-end', onRequestPipeEnd);
  }

  _read() {}
}

socket.on('request', (requestId, request) => {
  console.log(`${request.method}: `, request.path);
  request.port = options.port;
  request.hostname = options.host;
  const socketRequest = new SocketRequest({
    requestId,
    socket: socket,
  });
  const localReq = http.request(request);
  socketRequest.pipe(localReq);
  const onSocketRequestError = (e) => {
    socketRequest.off('end', onSocketRequestEnd);
    localReq.destroy(e);
  };
  const onSocketRequestEnd = () => {
    socketRequest.off('error', onSocketRequestError);
  };
  socketRequest.once('error', onSocketRequestError);
  socketRequest.once('end', onSocketRequestEnd);
  // ...
});

第四步:从客户端传输响应到服务器

为将响应数据发送到隧道服务器,我们将使用流模块创建可写 stream

const stream = require('stream');

class SocketResponse extends stream.Writable {
  constructor({ socket, responseId }) {
    super();
    this._socket = socket;
    this._responseId = responseId;
  }

  _write(chunk, encoding, callback) {
    this._socket.emit('response-pipe', this._responseId, chunk);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _writev(chunks, callback) {
    this._socket.emit('response-pipes', this._responseId, chunks);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _final(callback) {
    this._socket.emit('response-pipe-end', this._responseId);
    this._socket.io.engine.once('drain', () => {
      callback();
    });
  }

  _destroy(e, callback) {
    if (e) {
      this._socket.emit('response-pipe-error', this._responseId, e && e.message);
      this._socket.io.engine.once('drain', () => {
        callback();
      });
      return;
    }
    callback();
  }

  writeHead(statusCode, statusMessage, headers) {
    this._socket.emit('response', this._responseId, {
      statusCode,
      statusMessage,
      headers,
    });
  }
}

socket.on('request', (requestId, request) => {
    // ...stream request and send request to local server...
    const onLocalResponse = (localRes) => {
      localReq.off('error', onLocalError);
      const socketResponse = new SocketResponse({
        responseId: requestId,
        socket: socket,
      });
      socketResponse.writeHead(
        localRes.statusCode,
        localRes.statusMessage,
        localRes.headers
      );
      localRes.pipe(socketResponse);
    };
    const onLocalError = (error) => {
      console.log(error);
      localReq.off('response', onLocalResponse);
      socket.emit('request-error', requestId, error && error.message);
      socketRequest.destroy(error);
    };
    localReq.once('error', onLocalError);
    localReq.once('response', onLocalResponse);
  });

为了在隧道服务器中获取响应数据,我们将创建一个可读流。

class SocketResponse extends Readable {
  constructor({ socket, responseId }) {
    super();
    this._socket = socket;
    this._responseId = responseId;
    const onResponse = (responseId, data) => {
      if (this._responseId === responseId) {
        this._socket.off('response', onResponse);
        this._socket.off('request-error', onRequestError);
        this.emit('response', data.statusCode, data.statusMessage, data.headers);
      }
    }
    const onResponsePipe = (responseId, data) => {
      if (this._responseId === responseId) {
        this.push(data);
      }
    };
    const onResponsePipes = (responseId, data) => {
      if (this._responseId === responseId) {
        data.forEach((chunk) => {
          this.push(chunk);
        });
      }
    };
    const onResponsePipeError = (responseId, error) => {
      if (this._responseId !== responseId) {
        return;
      }
      this._socket.off('response-pipe', onResponsePipe);
      this._socket.off('response-pipes', onResponsePipes);
      this._socket.off('response-pipe-error', onResponsePipeError);
      this._socket.off('response-pipe-end', onResponsePipeEnd);
      this.destroy(new Error(error));
    };
    const onResponsePipeEnd = (responseId, data) => {
      if (this._responseId !== responseId) {
        return;
      }
      if (data) {
        this.push(data);
      }
      this._socket.off('response-pipe', onResponsePipe);
      this._socket.off('response-pipes', onResponsePipes);
      this._socket.off('response-pipe-error', onResponsePipeError);
      this._socket.off('response-pipe-end', onResponsePipeEnd);
      this.push(null);
    };
    const onRequestError = (requestId, error) => {
      if (requestId === this._responseId) {
        this._socket.off('request-error', onRequestError);
        this._socket.off('response', onResponse);
        this._socket.off('response-pipe', onResponsePipe);
        this._socket.off('response-pipes', onResponsePipes);
        this._socket.off('response-pipe-error', onResponsePipeError);
        this._socket.off('response-pipe-end', onResponsePipeEnd);
        this.emit('requestError', error);
      }
    };
    this._socket.on('response', onResponse);
    this._socket.on('response-pipe', onResponsePipe);
    this._socket.on('response-pipes', onResponsePipes);
    this._socket.on('response-pipe-error', onResponsePipeError);
    this._socket.on('response-pipe-end', onResponsePipeEnd);
    this._socket.on('request-error', onRequestError);
  }

  _read(size) {}
}

app.use('/', (req, res) => {
  // ... stream request to tunnel client
  const onResponse = (statusCode, statusMessage, headers) => {
    socketRequest.off('requestError', onRequestError)
    res.writeHead(statusCode, statusMessage, headers);
  };
  socketResponse.once('requestError', onRequestError)
  socketResponse.once('response', onResponse);
  socketResponse.pipe(res);
  const onSocketError = () => {
    res.end(500);
  };
  socketResponse.once('error', onSocketError);
  connectedSocket.once('close', onSocketError)
  res.once('close', () => {
    connectedSocket.off('close', onSocketError);
    socketResponse.off('error', onSocketError);
  });
});

在完成以上所有步骤后,我们已经支持将 HTTP 请求流式传输到本地计算机,并从本地服务器发送响应到原始请求。这是一种轻量级的解决方案,但稳定性高且易于部署在任何 “Node.js” 环境中。

第六步:部署HTTP隧道服务

我们可以将HTTP隧道服务部署到云提供商(如 Heroku)。项目 Lite HTTP Tunnel 在Github存储库中包含一个 Heroku/Render 按钮,可让您快速将服务部署到 Heroku/Render。

更多信息

在本文中,我们学习了如何基于 WebSocket 和 Node.js 流构建HTTP隧道工具。使用此工具,我们可以将本地开发服务器暴露到 Internet,并从第三方服务接收 Webhook 消息。我们还了解了如何使用 Node.js 流在客户端和服务器之间传输大量数据。

英文文章:https://dev.to/embbnux/building-a-http-tunnel-with-websocket-and-nodejs-4bp5

使用 Tensorflow 构建与训练基于 Transformer 算法的宋词生成小程序(一)

学完 DeepLearning 系列课程后就想做一个小应用试试。一开始基于 RNN 训练了个古诗机器人,这个做的人很多,于是换了宋词。宋词是基于词牌生成,相对数据量比较少,基于 RNN 训练效果很一般。后来了解到了 Transformer 算法感觉发现了生机,训练了一下感觉效果真是不错。

使用效果测试的话可以直接扫码访问这个小程序玩玩, 或者小程序搜索 【小诗一首】:

小诗一首-宋词


这是生成的例子:

小诗一首-宋词-例子

Transformer (Attention Is All You Need) 介绍

Transformer 是 Google 于 2017 年在 Attention Is All You Need 论文中提出,基于 Attention 机制,在机器翻译及其他语言理解任务有着以往无法比拟的效果,后 2018 年又提出 Universal Transformer, 在多项有难度的语言理解任务上泛化效果明显更好。Transformer 解决 RNN 存在的长序列信息丢失的问题,像宋词这种讲究格式讲究押韵的语句,在 RNN 中就很难训练到好的效果,但是 Transformer 就可以很好的提取训练宋词中的格式以及押韵特征。

更多

想深入学习 Transformer 以及算法可以看 Tensorflow 官方的教程. 等下篇文章我也详细介绍如何基于 Transformer 训练一个宋词机器人,包括如何预处理训练数据,如何使用模型生成序列文本。

基于 github tag 与 travis 构建 npm 自动化 release 系统

最近发了个 npm package (koa-flash-message) 用于 kails 的 flash message 管理,发现手动 publish 还是比较麻烦了,所以决定利用 github tag 和 travis ci 来构建基于 tag 的自动化 publish 系统。
继续阅读基于 github tag 与 travis 构建 npm 自动化 release 系统

前后端分离-利用 koa 实现基于 node.js 的 web 高性能中间层

之前讲了利用 koa 实现了类似 rails 的 web 开发 mvc 项目 kails。 但是在今天的 web 开发中,大家都讲究前后端分离以及微服务,传统的 MVC 已经不能满足很多人胃口了。 所以今天再讲下如何利用 koa 构建高性能的 web 中间层,实现大前端概念下的前后端分离。

继续阅读前后端分离-利用 koa 实现基于 node.js 的 web 高性能中间层

使用 Travis CI 为开源项目 kails 构建自动化 CI 系统

Kails 开源后得到许多的朋友的关注,也就希望把 kails 做成比较标准的开源项目。想到的首先是给项目加测试代码,然后就是利用 CI 工具来自动跑测试,实现 自动化测试每一个新功能。由于是开源项目所以可以直接使用 travis ci, 它对开源项目是免费的,而由于它是基于 docker 来配置测试环境,非常的简单省心。

继续阅读使用 Travis CI 为开源项目 kails 构建自动化 CI 系统

docker容器内网络请求缓慢问题解决

在使用docker的过程中发现了几个问题,在docker里进行的网络请求经常会失败,比如npm install以及bundle install等操作,或者是作为中间层在应用中去获取api数据的过程经常会出现timeout等情况,所以开始探究docker的网络机制,以解决网络请求太慢的问题。
继续阅读docker容器内网络请求缓慢问题解决

[Kails]一个基于Koa2构建的类似于Rails的nodejs开源项目

最近研究了下 Koa2 框架,喜爱其中间件的思想。但是发现实在是太简洁了,只有基本功能,虽然可以方便搭各种服务,但是离可以适应快速开发的网站框架还是有点距离。于是参考 Rails 的大致框架搭建了个网站框架 kails, 配合 postgres 和 redis, 实现了 MVC 架构,前端 webpack,react前后端同构等网站开发基本框架。本文主要介绍 kails 搭建中的各种技术栈和思想。
继续阅读[Kails]一个基于Koa2构建的类似于Rails的nodejs开源项目

Rails使用负载均衡后获取用户ip错误问题解决

网站的流量越来越大后开始使用负载均衡来提高网站的并发数,负载均衡有很多选择,可以使用现成的slb产品,也可以使用nginx进行代理转发流量,使用后发现一个问题,服务器上获取的用户ip变成负载均衡机器的ip了,这里记录一下这个问题的解决。
继续阅读Rails使用负载均衡后获取用户ip错误问题解决

基于docker安装jenkins的配置和使用

最近更换CI,多方研究选上jenkins作为新的CI,这里简要介绍使用docker搭建配置jenkins的流程,以及遇到的一些坑。直接利用docker镜像跑jenkins,不仅简化了jenkins的安装和配置,而且再也不用担心换机器还要重复配置半天了。
继续阅读基于docker安装jenkins的配置和使用

生产环境使用docker部署rails应用puma和sidekiq

有幸拿到docker beta的测试资格, 在Mac OSX下使用docker更加方便好玩了。这篇博文介绍如何在生产环境也就是线上利用docker实现快速部署以及横向扩展,为大规模负载均衡做准备。这里使用一个docker容器来跑rails应用,另一个容器来跑异步队列sidekiq等服务, 数据库和redis使用RDS和云redis,直接使用docker镜像的数据库也可以。
继续阅读生产环境使用docker部署rails应用puma和sidekiq