P2P通信とは
クライアント・サーバー型では、すべての通信がサーバーを経由します。
Client → Server ← Client
P2Pでは、各ノードが直接通信する形です。
Peer ↔ Peer ↔ Peer
要は、各ノードがサーバーとクライアントの両方の役割を担うようになっていると考えておきましょう。
では、ここから実際のコードで学んでみましょう。
基本実装
シンプルなピアの実装
最小構成でP2Pネットワークを実装
// peer.ts
import WebSocket from 'ws';
class SimplePeer {
protected peers: WebSocket[] = [];
private server: WebSocket.Server;
constructor(public port: number) {
this.server = new WebSocket.Server({ port });
this.server.on('connection', (ws) => {
this.peers.push(ws);
ws.on('message', (data) => this.broadcast(data.toString(), ws));
ws.on('close', () => this.removePeer(ws));
});
}
connectTo(peerPort: number, retryInterval = 1000) {
const ws = new WebSocket(`ws://localhost:${peerPort}`);
ws.on('open', () => {
this.peers.push(ws);
console.log(`ポート ${peerPort} に接続しました`);
});
ws.on('message', (data) => {
console.log(`受信: ${data}`);
});
ws.on('close', () => this.removePeer(ws));
ws.on('error', (err) => {
console.log(`ポート ${peerPort} への接続失敗。${retryInterval / 1000}秒後に再接続します`);
setTimeout(() => this.connectTo(peerPort, retryInterval), retryInterval);
});
}
broadcast(message: string, sender?: WebSocket) {
this.peers.forEach(peer => {
if (peer !== sender && peer.readyState === WebSocket.OPEN) {
peer.send(message);
}
});
}
protected removePeer(ws: WebSocket) {
const index = this.peers.indexOf(ws);
if (index > -1) {
this.peers.splice(index, 1);
}
}
}
動作確認
2つのノードを起動して接続を確認
// node1.ts
import { SimplePeer } from './peer';
const peer1 = new SimplePeer(4001);
console.log('Node1 started on port 4001');
// 1秒後にnode2に接続
setTimeout(() => {
peer1.connectTo(4002);
}, 1000);
// node2.ts
import { SimplePeer } from './peer';
const peer2 = new SimplePeer(4002);
console.log('Node2 started on port 4002');
setTimeout(() => {
peer2.connectTo(4001);
}, 1000);
P2P通信の課題と解決策
ピア発見の仕組み
新しいピアがネットワークに参加する際、既存のピアを見つける必要がある
// discovery.ts
class PeerDiscovery {
private knownPeers = new Set<string>();
private bootstrapNodes = ['localhost:4001', 'localhost:4002'];
constructor() {
this.bootstrapNodes.forEach(node => this.knownPeers.add(node));
}
async discoverPeers(): Promise<string[]> {
const discoveredPeers: string[] = [];
for (const peer of this.knownPeers) {
try {
const peerList = await this.requestPeerList(peer);
peerList.forEach(p => {
if (!this.knownPeers.has(p)) {
this.knownPeers.add(p);
discoveredPeers.push(p);
}
});
} catch (error) {
console.log(`${peer} への接続に失敗しました`);
}
}
return discoveredPeers;
}
private async requestPeerList(peerAddress: string): Promise<string[]> {
// 実際の実装では、WebSocketやHTTPでピアリストを要求
return new Promise((resolve) => {
setTimeout(() => resolve(['localhost:3003', 'localhost:3004']), 100);
});
}
}
データ整合性の管理
複数のピアが同じデータを持つ場合、どの値が正しいかを判断する仕組みが必要
// consensus.ts
class SimpleConsensus {
private peers: SimplePeer[];
constructor(peers: SimplePeer[]) {
this.peers = peers;
}
async getConsensusValue(key: string): Promise<any> {
const responses = await this.collectValues(key);
return this.findMajorityValue(responses);
}
private async collectValues(key: string): Promise<any[]> {
const promises = this.peers.map(peer =>
this.requestValue(peer, key)
);
// タイムアウト付きで値を収集
return Promise.allSettled(promises).then(results =>
results
.filter(r => r.status === 'fulfilled')
.map(r => (r as PromiseFulfilledResult<any>).value)
);
}
private findMajorityValue(values: any[]): any {
const counts = new Map<any, number>();
values.forEach(value => {
const current = counts.get(value) || 0;
counts.set(value, current + 1);
});
let maxCount = 0;
let majorityValue = null;
counts.forEach((count, value) => {
if (count > maxCount) {
maxCount = count;
majorityValue = value;
}
});
return majorityValue;
}
private async requestValue(peer: SimplePeer, key: string): Promise<any> {
// 実装省略 - ピアに値を要求する処理
return null;
}
}
実践例:P2Pチャットシステム
基本実装を拡張してチャットシステムを作成します:
// p2p-chat.ts
import readline from 'readline';
import { SimplePeer } from './peer';
class P2PChat extends SimplePeer {
private rl?: readline.Interface;
private nickname: string;
constructor(port: number, nickname: string) {
super(port);
this.nickname = nickname;
this.setupInterface();
this.setupMessageHandler();
}
private setupInterface() {
this.rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
prompt: `${this.nickname}> `
});
console.log(`チャットに参加しました(${this.nickname})`);
console.log('メッセージを入力してEnterキーを押してください');
this.rl?.prompt();
this.rl.on('line', (input) => {
if (input.trim()) {
const message = JSON.stringify({
type: 'chat',
nickname: this.nickname,
content: input.trim(),
timestamp: Date.now()
});
this.broadcast(message);
this.displayMessage(this.nickname, input.trim());
}
this.rl?.prompt();
});
this.rl?.prompt();
}
private setupMessageHandler() {
// 親クラスの自動broadcastリスナーを削除して、チャット専用リスナーに置き換え
this.server.removeAllListeners('connection');
this.server.on('connection', (ws) => {
this.peers.push(ws);
ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
if (message.type === 'chat') {
this.displayMessage(message.nickname, message.content);
}
} catch (error) {
console.log('無効なメッセージを受信しました');
}
});
ws.on('close', () => this.removePeer(ws));
});
}
private displayMessage(nickname: string, content: string) {
const timestamp = new Date().toLocaleTimeString();
console.log(`\n[${timestamp}] ${nickname}: ${content}`);
this.rl?.prompt();
}
shutdown() {
this.rl?.close();
this.server.close();
}
}
チャットの使用例
// chat-node1.ts
import { P2PChat } from './p2p-chat';
const chat1 = new P2PChat(4001, 'Alice');
// 他のノードに接続
setTimeout(() => {
chat1.connectTo(4002);
}, 1000);
process.on('SIGINT', () => {
chat1.shutdown();
process.exit(0);
});
// chat-node2.ts
import { P2PChat } from './p2p-chat';
const chat2 = new P2PChat(4002, 'Bob');
setTimeout(() => {
chat2.connectTo(4001);
}, 1000);
process.on('SIGINT', () => {
chat2.shutdown();
process.exit(0);
});
実行環境の構築
パッケージのインストール
npm init -y
npm install ws @types/ws @types/node typescript ts-node
TypeScript設定
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
}
}
実行方法
ISSUE - 課題
両方のノード(chat-node1.ts, chat-node2.ts)を同時に起動してください。
片方だけを先に起動すると、接続先が存在しないため ECONNREFUSED
エラーが発生します。
# ターミナル1
npx ts-node chat-node1.ts
# ターミナル2
npx ts-node chat-node2.ts
まとめ
P2P通信の基本的な動作をなるべく体系的に説明してみました。実際のプロダクトでは、ネットワークだったりNAT越えだったり必要になりますが、この基礎を理解することで、P2Pネットワークの動作を把握できるかとおもいます。