Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major update: port mapping, vless outbound, and more #190

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
23eb8b7
Abstract worker-vless.js, create launcher for node
Jun 27, 2023
c7ee1a0
Fix indentation
Jun 27, 2023
0a6d83a
Move NodeJS launcher to a separated folder
Jun 27, 2023
d7cf50e
Archieve worker-vless.js and prepare to revert
Jun 27, 2023
30c01a8
Abstract socks5 version
Jun 27, 2023
7b9b464
Better logging
Jun 27, 2023
40100e1
Revert worker-vless.js
Jun 27, 2023
6869e32
Introduce fallback and globalConfig
Jun 28, 2023
3e70643
Add fallback and code clean up
Jun 28, 2023
00f4d39
Better websocket close logic
Jun 28, 2023
eaeb94a
Fix tryOutbound
Jun 28, 2023
6dad634
Add vless header maker
Jun 28, 2023
2e4208f
Impl Vless client without error handling
Jun 28, 2023
c69f300
Add some error handling to vless
Jun 28, 2023
7adaeef
Add Vless string parser
Jun 28, 2023
89a5bc9
Better Vless outbound
Jun 29, 2023
d1c4fca
More comments
Jun 29, 2023
f6eecad
Fix vless on workers
Jun 29, 2023
6a1ae11
Allowing the worker to push log to a remote server
Jun 30, 2023
7637d59
Support UDP tunneling via vless
Jul 1, 2023
29542c5
Add UDP outbound support if run on node
Jul 3, 2023
e54a093
Support IPv6 UDP outbound
Jul 3, 2023
f8434ba
Simply DNS over TCP implementation
Jul 6, 2023
85301ea
Fix indentation
Jul 9, 2023
54734bd
Fix UDP outbound on discontinued streams
Jul 10, 2023
d0ce27a
Fix DNS over TCP
Jul 10, 2023
a88b9fb
Use Uint8Array throughout the code
Jul 16, 2023
2dbb1cf
Fix earlydata
rikkagcp1 Jul 16, 2023
a63c7d3
Remove the use of Blob
Jul 17, 2023
217ddb3
better shadowrocket compatiblity
Jul 25, 2023
ad473ba
Code clean-up
Jul 25, 2023
d171ec9
More clean up
Jul 25, 2023
1c64027
Less verbose log
Jul 25, 2023
a58b6b2
More code clean up
Jul 25, 2023
ead7a93
Massive code clean-up, add type annotations.
Dec 1, 2023
3d332fb
Restore worker-with-socks5-experimental.js
Dec 1, 2023
6988af0
Add websocket message processor
Dec 2, 2023
9527803
Delay the instantiation of response processor
Dec 2, 2023
d69aad5
Add deno support, improve node support
Dec 3, 2023
dd33548
Fix IPv6 UDP on deno
Dec 3, 2023
c2334b2
Fix IPv6 inbound
Dec 6, 2023
41a7e75
Move type definition, better outbound impl
Dec 12, 2023
06334a4
Fix no 0rtt
Dec 12, 2023
43c2330
Fix zero-length earlyData handling
Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add websocket message processor
  • Loading branch information
rikkagcp1 committed Dec 2, 2023
commit 6988af0cc8de38429eee0f6a89460e478bd6aa50
61 changes: 45 additions & 16 deletions src/worker-neo.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ export const platformAPI = {
* }
*/
associate: null,

/**
* An optional processor to process the incoming WebSocket request and its response.
* @type { null | ((logger: LogFunction) => {
* request: TransformStream<Uint8Array, Uint8Array>,
* response: TransformStream<Uint8Array, Uint8Array>,
* })}
*/
processor: null,
}

/**
Expand Down Expand Up @@ -536,6 +545,15 @@ export function vlessOverWSHandler(webSocket, earlyDataHeader) {

const readableWebSocketStream = makeReadableWebSocketStream(webSocket, earlyData, null, log);

/** @type {null | TransformStream<Uint8Array, Uint8Array>} */
let vlessResponseProcessor = null;
let vlessTrafficData = readableWebSocketStream;
if (platformAPI.processor != null) {
const processor = platformAPI.processor(log);
vlessResponseProcessor = processor.response;
vlessTrafficData = readableWebSocketStream.pipeThrough(processor.request);
}

let vlessHeader = null;

// This source stream only contains raw traffic from the client
Expand Down Expand Up @@ -566,7 +584,8 @@ export function vlessOverWSHandler(webSocket, earlyDataHeader) {
flush(controller){
}
});
const fromClientTraffic = readableWebSocketStream.pipeThrough(vlessHeaderProcessor);

const fromClientTraffic = vlessTrafficData.pipeThrough(vlessHeaderProcessor);

/** @type {WritableStream<Uint8Array> | null}*/
let remoteTrafficSink = null;
Expand All @@ -587,12 +606,10 @@ export function vlessOverWSHandler(webSocket, earlyDataHeader) {
}

// ["version", "length of additional info"]
const vlessResponse = {
header: new Uint8Array([vlessHeader.vlessVersion[0], 0]),
}
const vlessResponseHeader = new Uint8Array([vlessHeader.vlessVersion[0], 0]);

// Need to ensure the outbound proxy (if any) is ready before proceeding.
remoteTrafficSink = await handleOutBound(vlessHeader, chunk, webSocket, vlessResponse, log);
remoteTrafficSink = await handleOutBound(vlessHeader, chunk, webSocket, vlessResponseHeader, vlessResponseProcessor, log);
// log('Outbound established!');
},
close() {
Expand Down Expand Up @@ -620,11 +637,12 @@ export function vlessOverWSHandler(webSocket, earlyDataHeader) {
* @param {{isUDP: boolean, addressType: number, addressRemote: string, portRemote: number}} vlessRequest
* @param {Uint8Array} rawClientData The raw client data to write.
* @param {WebSocket} webSocket The WebSocket to pass the remote socket to.
* @param {{header: Uint8Array}} vlessResponse Contains information to produce the vless response, such as the header.
* @param {Uint8Array} vlessResponseHeader Contains information to produce the vless response, such as the header.
* @param {null | TransformStream<Uint8Array, Uint8Array>} vlessResponseProcessor an optional TransformStream to process the Vless response.
* @param {LogFunction} log The logger function.
* @returns a non-null fulfill indicates the success connection to the destination or the remote proxy server
*/
async function handleOutBound(vlessRequest, rawClientData, webSocket, vlessResponse, log) {
async function handleOutBound(vlessRequest, rawClientData, webSocket, vlessResponseHeader, vlessResponseProcessor, log) {
const curOutBoundPtr = {index: 0, serverIndex: 0};

// Check if we should forward UDP DNS requests to a designated TCP DNS server.
Expand Down Expand Up @@ -812,7 +830,7 @@ async function handleOutBound(vlessRequest, rawClientData, webSocket, vlessRespo
};

const readableStream = makeReadableWebSocketStream(wsToVlessServer, null, headerStripper, log);
const vlessReqHeader = makeVlessReqHeader(vlessRequest.isUDP ? VlessCmd.UDP : VlessCmd.TCP, vlessRequest.addressType, vlessRequest.addressRemote, vlessRequest.portRemote, uuid, rawClientData);
const vlessReqHeader = makeVlessReqHeader(vlessRequest.isUDP ? VlessCmd.UDP : VlessCmd.TCP, vlessRequest.addressType, vlessRequest.addressRemote, vlessRequest.portRemote, uuid);
// Send the first packet (header + rawClientData), then strip the response header with headerStripper
await writeFirstChunk(writableStream, joinUint8Array(vlessReqHeader, rawClientData));
return {
Expand Down Expand Up @@ -859,7 +877,7 @@ async function handleOutBound(vlessRequest, rawClientData, webSocket, vlessRespo
}

if (destRWPair != null) {
const hasIncomingData = await remoteSocketToWS(destRWPair.readableStream, webSocket, vlessResponse, log);
const hasIncomingData = await remoteSocketToWS(destRWPair.readableStream, webSocket, vlessResponseHeader, vlessResponseProcessor, log);
if (hasIncomingData) {
return destRWPair.writableStream;
}
Expand Down Expand Up @@ -1173,11 +1191,12 @@ function processVlessHeader(
* Stream data from the remote destination (any) to the client side (Websocket)
* @param {ReadableStream<Uint8Array>} remoteSocketReader from the remote destination
* @param {WebSocket} webSocket to the client side
* @param {{header: Uint8Array}} vlessResponse Contains information to produce the vless reponse, such as the header.
* @param {Uint8Array} vlessResponseHeader The Vless response header.
* @param {null | TransformStream<Uint8Array, Uint8Array>} vlessResponseProcessor an optional TransformStream to process the Vless response.
* @param {LogFunction} log
* @returns {Promise<boolean>} has hasIncomingData
*/
async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponse, log) {
async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponseHeader, vlessResponseProcessor, log) {
// This promise fulfills if:
// 1. There is any incoming data
// 2. The remoteSocketReader closes without any data
Expand All @@ -1187,7 +1206,9 @@ async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponse, lo
let hasIncomingData = false;

// Add the response header and monitor if there is any traffic coming from the remote host.
remoteSocketReader.pipeThrough(new TransformStream({

/** @type {TransformStream<Uint8Array, Uint8Array>} */
const vlessResponseHeaderPrepender = new TransformStream({
start() {
},
transform(chunk, controller) {
Expand All @@ -1196,7 +1217,7 @@ async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponse, lo
resolve(true);

if (!headerSent) {
controller.enqueue(joinUint8Array(vlessResponse.header, chunk));
controller.enqueue(joinUint8Array(vlessResponseHeader, chunk));
headerSent = true;
} else {
controller.enqueue(chunk);
Expand All @@ -1208,8 +1229,10 @@ async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponse, lo
// The connection has been closed, resolve the promise anyway.
resolve(hasIncomingData);
}
}))
.pipeTo(new WritableStream({
})

/** @type {WritableStream<Uint8Array>} */
const toClientWsSink = new WritableStream({
start() {
},
write(chunk, controller) {
Expand All @@ -1236,7 +1259,13 @@ async function remoteSocketToWS(remoteSocketReader, webSocket, vlessResponse, lo
// abort(reason) {
// console.error(`remoteSocket.readable aborts`, reason);
// },
}))
});

const vlessResponseWithHeader = remoteSocketReader.pipeThrough(vlessResponseHeaderPrepender);
const processedVlessResponse = vlessResponseProcessor == null ? vlessResponseWithHeader :
vlessResponseWithHeader.pipeThrough(vlessResponseProcessor);

processedVlessResponse.pipeTo(toClientWsSink)
.catch((error) => {
console.error(
`remoteSocketToWS has exception, readyState = ${webSocket.readyState} :`,
Expand Down