Php WebSocket Örneği

Php WebSocket Örneği

WebSocket Nedir?

WebSocket, bir sunucu ile bir veya daha fazla istemci arasında düşük gecikmeli (veya hızlı), kalıcı bağlantılardır. AJAX  isteklerinden farklı olarak , WebSocket iki yönlüdür (push-pull), yani hem istemci hem de sunucu gerçek zamanlı olarak birbirini dinleyebilir ve değişikliklere yanıt verebilir.

Örnek Proje

Öncelikle ws.class.php dosyamızı oluşturalım

<?php
class WebSocketUser
{
    public $socket;
    public $id;
    public $headers = array();
    public $handshake = false;

    public $handlingPartialPacket = false;
    public $partialBuffer = "";

    public $sendingContinuous = false;
    public $partialMessage = "";

    public $hasSentClose = false;

    function __construct($id, $socket)
    {
        $this->id = $id;
        $this->socket = $socket;
    }
}

abstract class WebSocketServer
{
    protected $userClass = 'WebSocketUser';
    protected $maxBufferSize;
    protected $master;
    protected $sockets                              = array();
    protected $users                                = array();
    protected $heldMessages                         = array();
    protected $interactive                          = true;
    protected $headerOriginRequired                 = false;
    protected $headerSecWebSocketProtocolRequired   = false;
    protected $headerSecWebSocketExtensionsRequired = false;

    function __construct($addr, $port, $bufferLength = 2048)
    {
        $this->maxBufferSize = $bufferLength;
        $this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)  or die("Failed: socket_create()");
        socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("Failed: socket_option()");
        socket_bind($this->master, $addr, $port)                      or die("Failed: socket_bind()");
        socket_listen($this->master, 20)                               or die("Failed: socket_listen()");
        $this->sockets['m'] = $this->master;
        $this->stdout("Server started\nListening on: $addr:$port\nMaster socket: " . $this->master);
    }

    abstract protected function process($user, $message);
    abstract protected function connected($user);
    abstract protected function closed($user);

    protected function connecting($user)
    {
        //Kullanıcı bağlanmadan hemen önce
    }

    protected function send($user, $message)
    {
        if ($user->handshake) {
            $message = $this->frame($message, $user);
            $result = @socket_write($user->socket, $message, strlen($message));
        } else {
            $holdingMessage = array('user' => $user, 'message' => $message);
            $this->heldMessages[] = $holdingMessage;
        }
    }

    protected function tick()
    {
        // Periyodik olarak gerçekleşmesi gereken herhangi bir işlem için bu fonksiyonu kullanın.
    }

    protected function _tick()
    {
        foreach ($this->heldMessages as $key => $hm) {
            $found = false;
            foreach ($this->users as $currentUser) {
                if ($hm['user']->socket == $currentUser->socket) {
                    $found = true;
                    if ($currentUser->handshake) {
                        unset($this->heldMessages[$key]);
                        $this->send($currentUser, $hm['message']);
                    }
                }
            }
            if (!$found) {
                unset($this->heldMessages[$key]);
            }
        }
    }

    public function run()
    {
        while (true) {
            if (empty($this->sockets)) {
                $this->sockets['m'] = $this->master;
            }
            $read = $this->sockets;
            $write = $except = null;
            $this->_tick();
            $this->tick();
            @socket_select($read, $write, $except, 1);
            foreach ($read as $socket) {
                if ($socket == $this->master) {
                    $client = socket_accept($socket);
                    if ($client < 0) {
                        $this->stderr("Failed: socket_accept()");
                        continue;
                    } else {
                        $this->connect($client);
                        $this->stdout("Client connected. " . $client);
                    }
                } else {
                    $numBytes = @socket_recv($socket, $buffer, $this->maxBufferSize, 0);
                    if ($numBytes === false) {
                        $sockErrNo = socket_last_error($socket);
                        switch ($sockErrNo) {
                            case 102: // ENETRESET    -- Sıfırlama nedeniyle ağ bağlantısı kesildi
                            case 103: // ECONNABORTED - Yazılım bağlantı kesilmesine neden oldu
                            case 104: // ECONNRESET - Bağlantı eş tarafından sıfırlandı
                            case 108: // ESHUTDOWN - Aktarım bitiş noktası kapatıldıktan sonra gönderilemiyor - soket kapatıldıktan sonra yazmaya çalışıyorsak, muhtemelen bizim tarafımızdan bir hata daha var. Muhtemelen kritik bir hata değil.
                            case 110: // ETIMEDOUT - Bağlantı zaman aşımına uğradı
                            case 111: // ECONNREFUSED - Bağlantı reddedildi - Dinlemediğimiz için bunu görmemeliyiz ... Hala kritik bir hata değil.
                            case 112: // EHOSTDOWN - Sunucu çalışmıyor - Yine, bunu görmemeliyiz ve tekrar kritik olmamalı, çünkü bu sadece bir bağlantıdır ve hala / başkalarını dinlemek istiyoruz.
                            case 113: // EHOSTUNREACH - Barınacak yol yok
                            case 121: // EREMOTEIO - Rempte Girdi / Çıktı hatası - Sabit diskleri patladı.
                            case 125: // ECANCELED - İşlem iptal edildi

                                $this->stderr("Unusual disconnect on socket " . $socket);
                                $this->disconnect($socket, true, $sockErrNo); //kendi uygulamasına sahip bir kişinin soketteki hata koşullarını kontrol etmek istemesi durumunda, hatayı silmeden önce bağlantıyı kesin.
                                break;
                            default:

                                $this->stderr('Socket error: ' . socket_strerror($sockErrNo));
                        }
                    } elseif ($numBytes == 0) {
                        $this->disconnect($socket);
                        $this->stderr("Client disconnected. TCP connection lost: " . $socket);
                    } else {
                        $user = $this->getUserBySocket($socket);
                        if (!$user->handshake) {
                            $tmp = str_replace("\r", '', $buffer);
                            if (strpos($tmp, "\n\n") === false) {
                                continue;
                            }
                            $this->doHandshake($user, $buffer);
                        } else {
                            $this->split_packet($numBytes, $buffer, $user);
                        }
                    }
                }
            }
        }
    }

    protected function connect($socket)
    {
        $user = new $this->userClass(uniqid('u'), $socket);
        $this->users[$user->id] = $user;
        $this->sockets[$user->id] = $socket;
        $this->connecting($user);
    }

    protected function disconnect($socket, $triggerClosed = true, $sockErrNo = null)
    {
        $disconnectedUser = $this->getUserBySocket($socket);

        if ($disconnectedUser !== null) {
            unset($this->users[$disconnectedUser->id]);

            if (array_key_exists($disconnectedUser->id, $this->sockets)) {
                unset($this->sockets[$disconnectedUser->id]);
            }

            if (!is_null($sockErrNo)) {
                socket_clear_error($socket);
            }

            if ($triggerClosed) {
                $this->stdout("Client disconnected. " . $disconnectedUser->socket);
                $this->closed($disconnectedUser);
                socket_close($disconnectedUser->socket);
            } else {
                $message = $this->frame('', $disconnectedUser, 'close');
                @socket_write($disconnectedUser->socket, $message, strlen($message));
            }
        }
    }

    protected function doHandshake($user, $buffer)
    {
        $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
        $headers = array();
        $lines = explode("\n", $buffer);
        foreach ($lines as $line) {
            if (strpos($line, ":") !== false) {
                $header = explode(":", $line, 2);
                $headers[strtolower(trim($header[0]))] = trim($header[1]);
            } elseif (stripos($line, "get ") !== false) {
                preg_match("/GET (.*) HTTP/i", $buffer, $reqResource);
                $headers['get'] = trim($reqResource[1]);
            }
        }
        if (isset($headers['get'])) {
            $user->requestedResource = $headers['get'];
        } else {
            $handshakeResponse = "HTTP/1.1 405 Method Not Allowed\r\n\r\n";
        }
        if (!isset($headers['host']) || !$this->checkHost($headers['host'])) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['upgrade']) || strtolower($headers['upgrade']) != 'websocket') {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['connection']) || strpos(strtolower($headers['connection']), 'upgrade') === FALSE) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['sec-websocket-key'])) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        } else {
        }
        if (!isset($headers['sec-websocket-version']) || strtolower($headers['sec-websocket-version']) != 13) {
            $handshakeResponse = "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocketVersion: 13";
        }
        if (($this->headerOriginRequired && !isset($headers['origin'])) || ($this->headerOriginRequired && !$this->checkOrigin($headers['origin']))) {
            $handshakeResponse = "HTTP/1.1 403 Forbidden";
        }
        if (($this->headerSecWebSocketProtocolRequired && !isset($headers['sec-websocket-protocol'])) || ($this->headerSecWebSocketProtocolRequired && !$this->checkWebsocProtocol($headers['sec-websocket-protocol']))) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (($this->headerSecWebSocketExtensionsRequired && !isset($headers['sec-websocket-extensions'])) || ($this->headerSecWebSocketExtensionsRequired && !$this->checkWebsocExtensions($headers['sec-websocket-extensions']))) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }


        if (isset($handshakeResponse)) {
            socket_write($user->socket, $handshakeResponse, strlen($handshakeResponse));
            $this->disconnect($user->socket);
            return;
        }

        $user->headers = $headers;
        $user->handshake = $buffer;

        $webSocketKeyHash = sha1($headers['sec-websocket-key'] . $magicGUID);

        $rawToken = "";
        for ($i = 0; $i < 20; $i++) {
            $rawToken .= chr(hexdec(substr($webSocketKeyHash, $i * 2, 2)));
        }
        $handshakeToken = base64_encode($rawToken) . "\r\n";

        $subProtocol = (isset($headers['sec-websocket-protocol'])) ? $this->processProtocol($headers['sec-websocket-protocol']) : "";
        $extensions = (isset($headers['sec-websocket-extensions'])) ? $this->processExtensions($headers['sec-websocket-extensions']) : "";

        $handshakeResponse = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $handshakeToken$subProtocol$extensions\r\n";
        socket_write($user->socket, $handshakeResponse, strlen($handshakeResponse));
        $this->connected($user);
    }

    protected function checkHost($hostName)
    {
        return true;
    }

    protected function checkOrigin($origin)
    {
        return true;
    }

    protected function checkWebsocProtocol($protocol)
    {
        return true;
    }

    protected function checkWebsocExtensions($extensions)
    {
        return true;
    }

    protected function processProtocol($protocol)
    {
        return "";
    }

    protected function processExtensions($extensions)
    {
        return "";
    }

    protected function getUserBySocket($socket)
    {
        foreach ($this->users as $user) {
            if ($user->socket == $socket) {
                return $user;
            }
        }
        return null;
    }

    public function stdout($message)
    {
        if ($this->interactive) {
            echo "$message\n";
        }
    }

    public function stderr($message)
    {
        if ($this->interactive) {
            echo "$message\n";
        }
    }

    protected function frame($message, $user, $messageType = 'text', $messageContinues = false)
    {
        switch ($messageType) {
            case 'continuous':
                $b1 = 0;
                break;
            case 'text':
                $b1 = ($user->sendingContinuous) ? 0 : 1;
                break;
            case 'binary':
                $b1 = ($user->sendingContinuous) ? 0 : 2;
                break;
            case 'close':
                $b1 = 8;
                break;
            case 'ping':
                $b1 = 9;
                break;
            case 'pong':
                $b1 = 10;
                break;
        }
        if ($messageContinues) {
            $user->sendingContinuous = true;
        } else {
            $b1 += 128;
            $user->sendingContinuous = false;
        }

        $length = strlen($message);
        $lengthField = "";
        if ($length < 126) {
            $b2 = $length;
        } elseif ($length < 65536) {
            $b2 = 126;
            $hexLength = dechex($length);
            if (strlen($hexLength) % 2 == 1) {
                $hexLength = '0' . $hexLength;
            }
            $n = strlen($hexLength) - 2;

            for ($i = $n; $i >= 0; $i = $i - 2) {
                $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
            }
            while (strlen($lengthField) < 2) {
                $lengthField = chr(0) . $lengthField;
            }
        } else {
            $b2 = 127;
            $hexLength = dechex($length);
            if (strlen($hexLength) % 2 == 1) {
                $hexLength = '0' . $hexLength;
            }
            $n = strlen($hexLength) - 2;

            for ($i = $n; $i >= 0; $i = $i - 2) {
                $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
            }
            while (strlen($lengthField) < 8) {
                $lengthField = chr(0) . $lengthField;
            }
        }

        return chr($b1) . chr($b2) . $lengthField . $message;
    }

    protected function split_packet($length, $packet, $user)
    {
        if ($user->handlingPartialPacket) {
            $packet = $user->partialBuffer . $packet;
            $user->handlingPartialPacket = false;
            $length = strlen($packet);
        }
        $fullpacket = $packet;
        $frame_pos = 0;
        $frame_id = 1;

        while ($frame_pos < $length) {
            $headers = $this->extractHeaders($packet);
            $headers_size = $this->calcoffset($headers);
            $framesize = $headers['length'] + $headers_size;

            $frame = substr($fullpacket, $frame_pos, $framesize);

            if (($message = $this->deframe($frame, $user, $headers)) !== FALSE) {
                if ($user->hasSentClose) {
                    $this->disconnect($user->socket);
                } else {
                    if ((preg_match('//u', $message)) || ($headers['opcode'] == 2)) {
                        $this->process($user, $message);
                    } else {
                        $this->stderr("not UTF-8\n");
                    }
                }
            }
            $frame_pos += $framesize;
            $packet = substr($fullpacket, $frame_pos);
            $frame_id++;
        }
    }

    protected function calcoffset($headers)
    {
        $offset = 2;
        if ($headers['hasmask']) {
            $offset += 4;
        }
        if ($headers['length'] > 65535) {
            $offset += 8;
        } elseif ($headers['length'] > 125) {
            $offset += 2;
        }
        return $offset;
    }

    protected function deframe($message, &$user)
    {
        $headers = $this->extractHeaders($message);
        $pongReply = false;
        $willClose = false;
        switch ($headers['opcode']) {
            case 0:
            case 1:
            case 2:
                break;
            case 8:
                $user->hasSentClose = true;
                return "";
            case 9:
                $pongReply = true;
            case 10:
                break;
            default:
                $willClose = true;
                break;
        }

        if ($this->checkRSVBits($headers, $user)) {
            return false;
        }

        if ($willClose) {
            return false;
        }

        $payload = $user->partialMessage . $this->extractPayload($message, $headers);

        if ($pongReply) {
            $reply = $this->frame($payload, $user, 'pong');
            socket_write($user->socket, $reply, strlen($reply));
            return false;
        }
        if ($headers['length'] > strlen($this->applyMask($headers, $payload))) {
            $user->handlingPartialPacket = true;
            $user->partialBuffer = $message;
            return false;
        }

        $payload = $this->applyMask($headers, $payload);

        if ($headers['fin']) {
            $user->partialMessage = "";
            return $payload;
        }
        $user->partialMessage = $payload;
        return false;
    }

    protected function extractHeaders($message)
    {
        $header = array(
            'fin'     => $message[0] & chr(128),
            'rsv1'    => $message[0] & chr(64),
            'rsv2'    => $message[0] & chr(32),
            'rsv3'    => $message[0] & chr(16),
            'opcode'  => ord($message[0]) & 15,
            'hasmask' => $message[1] & chr(128),
            'length'  => 0,
            'mask'    => ""
        );
        $header['length'] = (ord($message[1]) >= 128) ? ord($message[1]) - 128 : ord($message[1]);

        if ($header['length'] == 126) {
            if ($header['hasmask']) {
                $header['mask'] = $message[4] . $message[5] . $message[6] . $message[7];
            }
            $header['length'] = ord($message[2]) * 256
                + ord($message[3]);
        } elseif ($header['length'] == 127) {
            if ($header['hasmask']) {
                $header['mask'] = $message[10] . $message[11] . $message[12] . $message[13];
            }
            $header['length'] = ord($message[2]) * 65536 * 65536 * 65536 * 256
                + ord($message[3]) * 65536 * 65536 * 65536
                + ord($message[4]) * 65536 * 65536 * 256
                + ord($message[5]) * 65536 * 65536
                + ord($message[6]) * 65536 * 256
                + ord($message[7]) * 65536
                + ord($message[8]) * 256
                + ord($message[9]);
        } elseif ($header['hasmask']) {
            $header['mask'] = $message[2] . $message[3] . $message[4] . $message[5];
        }
        return $header;
    }

    protected function extractPayload($message, $headers)
    {
        $offset = 2;
        if ($headers['hasmask']) {
            $offset += 4;
        }
        if ($headers['length'] > 65535) {
            $offset += 8;
        } elseif ($headers['length'] > 125) {
            $offset += 2;
        }
        return substr($message, $offset);
    }

    protected function applyMask($headers, $payload)
    {
        $effectiveMask = "";
        if ($headers['hasmask']) {
            $mask = $headers['mask'];
        } else {
            return $payload;
        }

        while (strlen($effectiveMask) < strlen($payload)) {
            $effectiveMask .= $mask;
        }
        while (strlen($effectiveMask) > strlen($payload)) {
            $effectiveMask = substr($effectiveMask, 0, -1);
        }
        return $effectiveMask ^ $payload;
    }
    protected function checkRSVBits($headers, $user)
    {
        if (ord($headers['rsv1']) + ord($headers['rsv2']) + ord($headers['rsv3']) > 0) {
            return true;
        }
        return false;
    }

    protected function strtohex($str)
    {
        $strout = "";
        for ($i = 0; $i < strlen($str); $i++) {
            $strout .= (ord($str[$i]) < 16) ? "0" . dechex(ord($str[$i])) : dechex(ord($str[$i]));
            $strout .= " ";
            if ($i % 32 == 7) {
                $strout .= ": ";
            }
            if ($i % 32 == 15) {
                $strout .= ": ";
            }
            if ($i % 32 == 23) {
                $strout .= ": ";
            }
            if ($i % 32 == 31) {
                $strout .= "\n";
            }
        }
        return $strout . "\n";
    }

    protected function printHeaders($headers)
    {
        echo "Array\n(\n";
        foreach ($headers as $key => $value) {
            if ($key == 'length' || $key == 'opcode') {
                echo "\t[$key] => $value\n\n";
            } else {
                echo "\t[$key] => " . $this->strtohex($value) . "\n";
            }
        }
        echo ")\n";
    }
}

Şimdi server.php dosyamızı oluşturuyoruz

#!/usr/bin/env php
<?php
require_once('./ws.class.php');

class Server extends WebSocketServer
{
    protected function process($user, $message)
    {
        $message = json_decode($message);
        switch ($message->command) {
            case 'selamver':
                $this->send($user, json_encode(array("command" => "selamver", "message" => "Merhaba")));
                break;
            case 'adinisor':
                $this->send($user, json_encode(array("command" => "adinisoyle", "message" => "Benim adım websocket!")));
                break;
        }
    }

    protected function connected($user)
    {
        //Kullanıcılar bağlandığı zaman çalışacak fonksiyon
    }

    protected function closed($user)
    {
        // Kullanıcı bağlantısı sonlandığı zaman çalışacak fonksiyon
    }
}

$echo = new Server("0.0.0.0", "9000");

try {
    $echo->run();
} catch (Exception $e) {
    $echo->stdout($e->getMessage());
}

Şimdi ssh ile sunucumuza bağlanıp aşağıdaki komudu çalıştıracağız

php server.php

Şimdi Javascript ile WebSocketimize bağlanmak için aşağıdaki kodları kullanıyoruz

var socket;

function init() {
    var host = "ws://sunucuipadresi:9000";
    try {
        socket = new WebSocket(host);
        console.log('WebSocket - status ' + socket.readyState);
        socket.onopen = function (msg) {
            run();
        };
        socket.onmessage = function (msg) {
            var message = JSON.parse(msg.data);
            console.log(message);
        };
        socket.onclose = function (msg) {
            console.log("Disconnected - status " + this.readyState);
        };
        socket.onerror = function (msg) {
            reconnect();
        };
    } catch (ex) {
        console.log(ex);
    }
}

function run() {
    socket.send(JSON.stringify({
        "command": "selamver"
    }));
    setTimeout(() => {
        socket.send(JSON.stringify({
            "command": "adinisor"
        }));
    }, 2000);
}

function quit() {
    if (socket != null) {
        log("Goodbye!");
        socket.close();
        socket = null;
    }
}

function reconnect() {
    quit();
    init();
}

Comments


Yorum yazın







Tarih: