Search code examples
phptcpraspberry-pi

Trying to build a TCP Server for ingesting data


I'm trying to build a system to replace a current system that we have. It's a streaming system for audio. The current system has a closed source program that runs on a pc that ingests data and audio. I'm working with a raspberry pi and TCP data to try and replace the current system. The clients send their data to the pi and the pi takes the data and logs it.

The way I understand it, the way this works, in theory, is that the client makes a connection to the server. Then the client can send data through that connection until it either disconnects or there is a problem with the connection.

The client can continue sending data as needed. In my case, the amount of characters is the same every time, but it seems like the server is getting the first bit of data and then basically timing out. Then the client makes another connection and it gets that bit of data, times out and repeats.

I know this because the system that sends the data logs it and there will be 10 lines on the clients but only 5 or so on the server.

Here is my code, I'm doing it in PHP cause I'm more comfortable with that, but I'd be open to changing to python or something if it would make the process simpler.

#!/usr/bin/php -q 
<?php 
/** 
  * Listens for requests and forks on each connection 
  */ 

$__server_listening = true; 

error_reporting(E_ALL); 
set_time_limit(0); 
ob_implicit_flush(); 
declare(ticks = 1); 

become_daemon(); 

/* nobody/nogroup, change to your host's uid/gid of the non-priv user */ 
//change_identity(1000, 1000); 

/* handle signals */ 
pcntl_signal(SIGTERM, 'sig_handler'); 
pcntl_signal(SIGINT, 'sig_handler'); 
pcntl_signal(SIGCHLD, 'sig_handler'); 

/* change this to your own host / port */ 
server_loop("10.0.3.49", 9009); 

/** 
  * Change the identity to a non-priv user 
  */ 
function change_identity( $uid, $gid ) 
{ 
    if( !posix_setgid( $gid ) ) 
    { 
        print "Unable to setgid to " . $gid . "!\n"; 
        exit; 
    } 

    if( !posix_setuid( $uid ) ) 
    { 
        print "Unable to setuid to " . $uid . "!\n"; 
        exit; 
    } 
} 

/** 
  * Creates a server socket and listens for incoming client connections 
  * @param string $address The address to listen on 
  * @param int $port The port to listen on 
  */ 
function server_loop($address, $port) 
{ 
    GLOBAL $__server_listening; 

    if(($sock = socket_create(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        echo "failed to create socket: ".socket_strerror($sock)."\n"; 
        exit(); 
    } 

    if(($ret = socket_bind($sock, $address, $port)) < 0) 
    { 
        echo "failed to bind socket: ".socket_strerror($ret)."\n"; 
        exit(); 
    } 

    if( ( $ret = socket_listen( $sock, 0 ) ) < 0 ) 
    { 
        echo "failed to listen to socket: ".socket_strerror($ret)."\n"; 
        exit(); 
    } 

    socket_set_nonblock($sock); 
    
    echo "waiting for clients to connect\n"; 

    while ($__server_listening) 
    { 
        $connection = @socket_accept($sock); 
        if ($connection === false) 
        { 
            usleep(100); 
        }elseif ($connection > 0) 
        { 
            handle_client($sock, $connection); 
        }else 
        { 
            echo "error: ".socket_strerror($connection); 
            die; 
        } 
    } 
} 

/** 
  * Signal handler 
  */ 
function sig_handler($sig) 
{ 
    switch($sig) 
    { 
        case SIGTERM: 
        case SIGINT: 
            exit(); 
        break; 

        case SIGCHLD: 
            pcntl_waitpid(-1, $status); 
        break; 
    } 
} 

/** 
  * Handle a new client connection 
  */ 
function handle_client($ssock, $csock) 
{ 
    GLOBAL $__server_listening; 

    $pid = pcntl_fork(); 

    if ($pid == -1) 
    { 
        /* fork failed */ 
        echo "fork failure!\n"; 
        die; 
    }elseif ($pid == 0) 
    { 
        /* child process */ 
        $__server_listening = false; 
        socket_close($ssock); 
        interact($csock); 
        socket_close($csock); 
    }else 
    { 
        socket_close($csock); 
    } 
} 

function interact($socket) 
{ 
    $buf = socket_read($socket, 2048, PHP_NORMAL_READ);

	echo $buf . "\n";

	$file = '/home/pi/nowplaying.log';

	// Open the file to get existing content
	$current = file_get_contents($file);
	// Append a new person to the file
	$current .= date('Y-m-d h:i:s') . ' ' . $buf . "\n";
	// Write the contents back to the file
	file_put_contents($file, $current);


    $station = substr($buf, 0, 6);
    $type = substr($buf, 14, 4);
    $artist = preg_replace('/\s+/', ' ', substr($buf, 48, 28));
    $title = preg_replace('/\s+/', ' ', substr($buf, 20, 28));
    $length = substr($buf, 80, 5);

    echo "Station is: " . $station . "\n";
    echo "Type is: " . $type . "\n";
    echo "Song is: " . $artist . " - " . $title . "\n";
    echo "Length is: " . $length . "\n";

    //postToServer($station, $type, $artist, $title, $length);
    //updateIceCastServer($station, $type, $artist, $title, $length);

} 

function postToServer($station, $type, $artist, $title, $length)
{

	$url = 'http://website.net/data/' . $station . '/';

	$data = array('artist' => $artist, 'title' => $title, 'type' => $type, 'length' => $length);

	// use key 'http' even if you send the request to https://...
	$options = array(
	    'http' => array(
	        'header'  => "Content-type: application/x-www-form-urlencoded\r\n",
	        'method'  => 'POST',
	        'content' => http_build_query($data)
	    )
	);
	$context  = stream_context_create($options);
	$result = file_get_contents($url, false, $context);
	if ($result === FALSE) { echo 'failed'; }

	var_dump($result);

}

function updateIceCastServer($station, $type, $artist, $title, $length)
{
	$username = 'admin';
	$password = 'admin';

	$url = 'http://ice.mywebserver.net:8000/admin/metadata?mount=/' . $station . '&mode=updinfo&song=' . $artist . ' - ' . $title;
	$data = array('artist' => $artist, 'title' => $title, 'type' => $type, 'length' => $length);

	// use key 'http' even if you send the request to https://...
	$options = array(
	    'http' => array(
	        'header'  => "Authorization: Basic " . base64_encode("$username:$password"),
	        'method'  => 'POST',
	        'content' => http_build_query($data),
	    )
	);
	$context  = stream_context_create($options);
	$result = file_get_contents($url, false, $context);
	if ($result === FALSE) { echo 'failed'; }

	var_dump($result);

}
/** 
  * Become a daemon by forking and closing the parent 
  */ 
function become_daemon() 
{ 
    $pid = pcntl_fork(); 
    
    if ($pid == -1) 
    { 
        /* fork failed */ 
        echo "fork failure!\n"; 
        exit(); 
    }elseif ($pid) 
    { 
        /* close the parent */ 
        exit(); 
    }else 
    { 
        /* child becomes our daemon */ 
        posix_setsid(); 
        chdir('/'); 
        umask(0); 
        return posix_getpid(); 

    } 
} 

?>


Solution

  • I ended up using netcat for this. It was much simpler to open the port with NC and the pass the incoming data.

    For my situation I ended up taking the data and duing a curl request to a microservice to do what I need to with it.

    Here was my final script to do all of this. I run a basic cron job to keep it running.

    #!/bin/bash
    
    logfile=smdv2.sh.log
    today=`date +%Y%m%d`
    
    log=~/$today-$logfile
    
    exec > >(tee -ai $log) && 2>&1
    
    echo "Started listening on port 9000"
    
    nc -klu 9000 | while read line
    do
        if [ "$line" == '' ]; then
        echo "Recieved Blank Line, Skipping"
    else
        echo "Recieved Line"
        echo $line
        cart=${line:6:7}
        type=${line:13:4}
        title=${line:19:28}
        length=${line:79:5}
    
        #Data Encoded Post
        curl -s \
        --data-urlencode "cart=$cart" \
        --data-urlencode "type=$type" \
        --data-urlencode "title=$title" \
        --data-urlencode "length=$length" \
        -X POST http://my.server.io/metadata/update
    
    fi
    

    done