Menu Categories Author

Nubis Novem

Consulting On Cloud Nine

Perl: Extracting Alpha Vantage stock price history via parallel threads

“She’s not a girl who misses much…”
“Happiness Is A Warm Gun”, from The Beatles’ “The White Album” (1968)

As we are getting more and more acquainted with a valuable flow of free financial information provided generously by a company called Alpha Vantage, we will now come up with a Perl script that would facilitate data extraction from their feed. The abovementioned service is free, but you have to register to obtain your individual API key. For obvious reasons we will use a demo API key in our Perl code examples here. We recommend you to get your own.

“Not missing much” but still allowing some to slip through our fingers, so we can always catch them tomorrow,—this will be the motto for our exercise. We promise that the message will become clearer as we make progress.

Task

Our tasks for this project may sound like this: Create a Perl script that would operate as a Web crawler, retrieve and save historical stock prices using a list of tickers from a text file with a decent success rate without causing much stress on the data source.

Test download

Today it is fairly easy to perform an HTTP or HTTPS request in Perl with LWP module. Our first test drive will ensure that our download automation does in fact work. Here we demonstrate that anyone may retrieve MSFT pricing history in CSV format from Alpha Vantage Web service and save the output to a file using, say, the following few lines:


use LWP;

my $AVAPIKEY = 'demo'; # please obtain your own API KEY at https://www.alphavantage.co/support/#api-key
my $AVURL = 'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&outputsize=full&apikey='
 . $AVAPIKEY . '&datatype=csv&symbol=';

my $symbol = 'MSFT';

my $browser = LWP::UserAgent->new();

my $response = $browser->get( $AVURL . $symbol );

local ( *LFILE );
open( LFILE, '>' . $symbol ) or die 'Could not open output file ' . $symbol . ' for writing!' . "\n";
print LFILE  $response->decoded_content . "\n";
close LFILE;

print 'DONE.' . "\n";

Real Life Experience

It works! Well… it works rather steadily if you are running this script after regular business hours. At the opening bell and right after close of trading your mileage may vary. The Alpha Vantage service tolerance is no more than 30 requests per minute (from one user or from one IP). That means if you run 30 queries in consecutive order with two-seconds delay between requests within one minute, then in theory you may have a 99% success rate. In our humble automation attempt we enforced this by calculating “requests per second” rate. If you exceed that arbitrary rate (or, if Alpha Vantage Web service is overwhelmed with other users’ requests) you may get a shorter response with warning or error message instead of normal data. Moreover, since the Alpha Vantage Web service is hosted at Heroku cloud cluster, their load balancers introduce another source of random delay in addition to the actual time spent on retrieving data and producing the output. If we access Alpha Vantage service today each request might take from two or three seconds and up to 15 or even 20 seconds to complete. That means significant time spent waiting while requesting price history for 100 or more tickers. Some of the requests would fail and will have to be retried. We should also introduce adaptive delays between requests depending on the outcome of previous attempts.

How about creating a more eloquent solution with parallel thread workers that would use appropriate request rate and handle complications while producing results in reasonable time? Let us wait no more and get to it.

Implementation

Programming parallel worker threads in Perl is not a rocket science. Though intimidating for a beginner, after some thoughtful exercises, trials, and tribulations, one could achieve an elegant and useful code enjoying benefits of functional programming and Inter-Process Communication functionality (aka IPC). Needless to mention that multi-threading is a common feature of popular operating systems (Windows or Linux, does not matter much) utilizing modern multi-core CPUs. Also, we are trying to design our code in a way that would allow it to run fairly well on most systems with minimum dependencies on optional modules, external tools, or libraries.

Loading tickers from text file

As our first step we read a file name as first argument of the script and load our tickers from that file. We expect a simple format: one ticker per line followed by optional description separated by a tab symbol (\t).


use threads;
use Thread::Queue;

my ( $listfile ) = @ARGV;

local (*LFILE);
open ( LFILE, '< ' . $listfile ) or die 'Could not open list file ' . $listfile . "\n";
my $newjobs = Thread::Queue->new();  # queue to hold all tickers

while ( <LFILE> ) {
   chomp;
   my ( $Ticker ) = split "\t"; # ticker may be followed by optional description delimited with tab
   $newjobs->enqueue( $Ticker );
}

close LFILE;

As a result we should have a queue $newjobs full of items to download, with a URL and id depending on specific symbol. For simplicity we do not check for duplicates here. If a ticker symbol is not unique, it will be requested as many times as it appears in the list file.

Parallel workers to process the jobs

At this time we define a skeleton of a windmill with multiple workers that would be servicing all the requests one by one with separate threads of code running in parallel. By design, each of the worker threads would have its own unique number or $worker_id. Before each worker starts checking the queue retrieving new jobs, we create its own $browser object for LWP API calls. You may notice here that we would need to set a limit for the number of parallel workers to create. That is defined by variable $max_parallel_wrks. More details will be provided a bit later.


my $worker_id = 0;

my @workers = map {
   $worker_id++;
   threads->create( 
      sub {   
         my $browser = LWP::UserAgent->new();
         while ( defined ( my $item = $newjobs->dequeue ) ) { 
            threads->yield;  # yield cpu time to allow other threads to work
            download( $browser, $item );
         }  
      } 
   );
} 1 .. $max_parallel_wrks;

$newjobs->enqueue( undef ) for @workers;  # indicate end of job by enqueuing undefined value for each worker

$_->join() for @workers; # need to wait for workers to finish their jobs

Download method

In the thread code above we invoke a method called download. Let us define this critical method through the use of familiar LWP calls that implement a GET request:


sub download {
   my ( $browser, $symbol ) = @_;

   my $response = $browser->get( $AVURL . $symbol, 'Accept-Encoding' => $enc_accept );

   my $dlfile = $item->{id} . '.csv.gz' ;
   my $z = IO::Compress::Gzip->new( $dlfile ) or die 'Could not write to ' . $dlfile . ': ' . $GzipError . "\n";
   print $z $response->decoded_content;
   close $z;
   -s $dlfile;
}

Note that instead of a plain text file we are now writing the received data into a file in compressed GZip format (.gz) via IO::Compress::GZip module. Upon completion, the download method would return size of the downloaded file as a way to determine if the data were received and saved successfully.

Enhanced functionality

You might have wondered where the adaptive delays, checking download outcome and all the whistles and bells announced by the introduction are? You would be right to ask about that! We will produce them here and now. For starters, it would be a couple of trivial things for logging and diagnostics:

The dtlog would produce a meaningful log record with a date and time stamp and worker unique number to identify each of the worker threads:


sub dtlog {
   strftime( '%a %b %d %H:%M:%S ', localtime ) . ' #' . sprintf( '%02d', $worker_id ) . ' ';
}

Sometimes we will have to wait for another minute to start. We have discovered that it would be prudent to hit a pause button temporarily demanding all the workers hold off placing new requests. That would be in case the Web service produced an error or rate limit warning. For this situation we will use wait_next_minute that simply waits until the next minute starts.


sub secs {   # get just seconds from current time
   strftime( '%S', localtime );
}

sub wait_next_minute {
   my $begin = secs();
   return unless $begin;
   while ( secs() >= $begin ) {
      sleep( $min_delay );
   }
}

We would need to alternate hostname in the URL with multiple IP for Alpha Vantage site obtained directly from their DNS records, not relying on our DNS client to do this for us (we assume that DNS client from the operating system would cache the first random IP while we need higher complexity and variety). It is done by employing LWP::UserAgent::DNS::Hosts that will set up a specific hostname resolution for one host. That might be the only optional module you have to install for this project.


use LWP::UserAgent::DNS::Hosts;  # you may need to install this module via your Perl module manager

my $hostsbyip = {};

sub alternateIP {
   my ( $hostname ) = @_;
   if ( ! exists $hostsbyip->{$hostname} ) {
      my ( $name, $aliases, $addrtype, $length, @addrs ) = gethostbyname( $hostname );
      @{$hostsbyip->{$hostname}} = map { join( '.', unpack( 'C4', $_ ) ) } @addrs;
   }
   my $randomIPhost = $hostsbyip->{$hostname}[int(rand( scalar @{$hostsbyip->{$hostname}} ))];
   LWP::UserAgent::DNS::Hosts->register_host( $hostname => $randomIPhost );
   LWP::UserAgent::DNS::Hosts->enable_override;
}

Below is a method to handle a failure. It would produce error message, remove the file (provided the Web output is saved) and hit the pause button, waiting until next minute. Note that here we will be using $crawler_pause_ALL variable as a pause button or a flag that is shared by all worker threads. To do this we will use threads::shared module and a special way of defining a variable:


use threads::shared;
my $crawler_pause_ALL :shared = 0;

sub failed {
   my ( $reason, $tfile, $response ) = @_;
   {
      lock( $crawler_req_err );
      $crawler_req_err++;
   }
   print dtlog . $reason . ', resp: ' . $response->code . "\n";
   unlink $tfile;
   if ( ! $crawler_pause_ALL ) {
      lock( $crawler_pause_ALL );
      $crawler_pause_ALL = 1;
      wait_next_minute() if secs() > $crawler_max_secs;
      $crawler_pause_ALL = 0;
   }
}

Finally, let us get back to our favorite download method. But, oh boy, you would not even recognize it now, all dressed up and fancy! So much was added to handle errors and other special situations. An object named $crawler_maxreqs_limit is a semaphore allowing us to limit maximum number of parallel requests by counting or blocking (holding) execution of the thread that is trying to run when the limit is reached. When each thread completes its LWP request, it will raise the semaphore allowing other threads to run. Consider semaphore as a shared counter that blocks worker thread execution when that counter hits zero until its shared value becomes a positive number again.


use threads::shared;
use Thread::Semaphore;

my $crawler_max_threads = 16;
my $crawler_min_threads = 4;
my $crawler_max_parallel_requests = 10;
my $crawler_max_req_rate = 0.46;
my $crawler_max_failed = 6;
my $crawler_max_attempts = 3;
my $crawler_req_timeout = 30;
my $crawler_max_secs = 32;
my $crawler_min_bytes = 500;
my $crawler_req_cnt :shared = 0;
my $crawler_req_err :shared = 0;

my $crawler_maxreqs_limit = Thread::Semaphore->new( $crawler_max_parallel_requests ) ;

sub download {
   my ( $browser, $symbol ) = @_;

   $crawler_maxreqs_limit->down; # wait for our turn, or lower the semaphore

   my $delaytime = $min_delay + rand( $crawler_req_err ) ; # increase random delay range when more errors
   my $persec = 0;

   do {  # initial delay, then check our pause ALL button and current rate of requests, if we can work
      sleep $delaytime;
   } until ( ! $crawler_pause_ALL && ( $persec = $crawler_req_cnt / ( time - $crawler_begin ) ) < $crawler_max_req_rate ); 
   { # counting outgoing requests via shared variable 
      lock( $crawler_req_cnt ); 
      $crawler_req_cnt++; 
   }
   my ( $hostname ) = ( $AVURL =~ /\:\/\/(.*?)\// );
   alternateIP( $hostname ); # switch to a different random IP for this hostname
   my $response = $browser->get( $AVURL . $symbol, 'Accept-Encoding' => $enc_accept );
   $crawler_maxreqs_limit->up; # raise the semaphore, upon get completion, allowing other threads to place requests
   my $dlfile = $symbol . '.csv.gz' ;
   my $tfile = $dlfile . $$ . '.tmp' ;
   my $z = IO::Compress::Gzip->new( $tfile ) or die 'Could not write to ' . $tfile . ': ' . $GzipError . "\n";
   print $z $response->decoded_content;
   close $z;
   if ( $response->code ne '200' ) {
      failed( 'Failed, bad or empty: ' . $tfile, $tfile, $response ); 
      return; 
   };
   my $dlfsize = -s $tfile;
   if ( $dlfsize ) {
      if ( $dlfsize < $crawler_min_bytes ) {
         failed( 'Response too small, ignored: ' . $dlfile . ' = ' . $dlfsize, $tfile, $response ); 
         return; 
      };
      if ( ! rename $tfile, $dlfile ) {
         failed( 'Could not rename tempfile: ' . $dlfile, $tfile, $response );
         return;
      };
      $dlfsize = -s $dlfile;
      print dtlog . 'Received new (' . $symbol . ') ' . $dlfsize . ' bytes..' . "\n";
   }
   $dlfsize;
}

Please note that with this more advanced logic we save the data result first into a temporary file to preserve old good data files that were saved from prior attempts in case we hit an error and will have to throw the temporary result down the drain.

Final script

By this time we have explained most of the crawler script functionality. Let us spread out the full source code that you should be able to run after applying a minimal tweak or two:


#!perl

use warnings;
use strict;

use LWP;
use LWP::UserAgent::DNS::Hosts;  # you may need to install this module via your Perl module manager

use IO::Compress::Gzip qw(gzip $GzipError) ;

use threads;
use threads::shared;
use Thread::Queue;
use Thread::Semaphore;

use POSIX qw(strftime);

use Time::HiRes qw(time sleep); # we need that for better precision with delays (milliseconds)

my $enc_accept = HTTP::Message::decodable;

my $min_delay = 0.50;
my $rnd_delay = 1.25;

my $AVAPIKEY = 'demo'; # please obtain your own API KEY at https://www.alphavantage.co/support/#api-key
my $AVURL = 'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&outputsize=full&apikey='
 . $AVAPIKEY . '&datatype=csv&symbol=';

my $crawler_max_threads = 16; # number of parallel workers at the beginning
my $crawler_min_threads = 4;  # decreased number of parallel workers in case of errors
my $crawler_max_parallel_requests = 10;  # limit number of parallel web requests
my $crawler_max_req_rate = 0.46;  # limit number of parallel requests per second via rate value
my $crawler_max_failed = 6;  # after so many failed requests, the number of workers will be decreased to minimum
my $crawler_max_attempts = 3;  # after so many tries and failed attempts the worker will give up requesting a symbol
my $crawler_req_timeout = 30;  # HTTP timeout for LWP
my $crawler_max_secs = 32;  # maximum delay and also a second half of a minute when wait_next_minute function will be invoked
my $crawler_min_bytes = 500;  # minimum saved file size to determine a successful completion
my $crawler_pause_ALL :shared = 0;  # a universal pause button for all threads preventing placing new requests (temporarily)
my $crawler_req_cnt :shared = 0;  # number of requests overall
my $crawler_req_err :shared = 0;  # number of failed requests

# semaphore to limit parallel requests for all threads
my $crawler_maxreqs_limit = Thread::Semaphore->new( $crawler_max_parallel_requests ) ;

my $crawler_begin = time;  # record starting time for request rate calculation

my $worker_id = 0;  # will be assigned with worker thread id inside threads, 0 for main thread execution

sub dtlog {
   strftime( '%a %b %d %H:%M:%S ', localtime ) . '< #' . sprintf( '%02d', $worker_id ) . '> ';
}

sub secs { # get just seconds from current time
   strftime( '%S', localtime );
}

sub wait_next_minute {
   my $begin = secs();
   return unless $begin;
   while ( secs() >= $begin ) {
      sleep( $min_delay );
   }
}

my $hostsbyip = {};

sub alternateIP {
   my ( $hostname ) = @_;
   if ( ! exists $hostsbyip->{$hostname} ) {
      my ( $name, $aliases, $addrtype, $length, @addrs ) = gethostbyname( $hostname );
      @{$hostsbyip->{$hostname}} = map { join( '.', unpack( 'C4', $_ ) ) } @addrs;
   }
   my $randomIPhost = $hostsbyip->{$hostname}[int(rand( scalar @{$hostsbyip->{$hostname}} ))];
   LWP::UserAgent::DNS::Hosts->register_host( $hostname => $randomIPhost );
   LWP::UserAgent::DNS::Hosts->enable_override;
}

sub failed {
   my ( $reason, $tfile, $response ) = @_;
   {
      lock( $crawler_req_err );
      $crawler_req_err++;
   }
   print dtlog . $reason . ', resp: ' . $response->code . "\n";
   unlink $tfile;
   if ( ! $crawler_pause_ALL ) {
      lock( $crawler_pause_ALL );
      $crawler_pause_ALL = 1;
      wait_next_minute() if secs() > $crawler_max_secs;
      $crawler_pause_ALL = 0;
   }
}

sub download {
   my ( $browser, $symbol ) = @_;

   $crawler_maxreqs_limit->down; # wait for our turn, or lower the semaphore

   my $delaytime = $min_delay + rand( $crawler_req_err ) ;
   my $persec = 0;

   do { # initial delay, then check our pause ALL button and current rate of requests, if we can work
      sleep $delaytime;
   } until ( ! $crawler_pause_ALL && ( $persec = $crawler_req_cnt / ( time - $crawler_begin ) ) < $crawler_max_req_rate );
   { # counting outgoing requests via shared variable
      lock( $crawler_req_cnt );
      $crawler_req_cnt++; 
   }
   my ( $hostname ) = ( $AVURL =~ /\:\/\/(.*?)\// );
   alternateIP( $hostname ); # switch to a different random IP for this hostname
   my $response = $browser->get( $AVURL . $symbol, 'Accept-Encoding' => $enc_accept );

   $crawler_maxreqs_limit->up; # raise the semaphore, upon get completion, allowing other threads to place requests

   my $dlfile = $symbol . '.csv.gz' ;
   my $tfile = $dlfile . $$ . '.tmp' ;
   my $z = IO::Compress::Gzip->new( $tfile ) or die 'Could not write to ' . $tfile . ': ' . $GzipError . "\n";
   print $z $response->decoded_content;
   close $z;
   if ( $response->code ne '200' ) {
      failed( 'Failed, bad or empty: ' . $tfile, $tfile, $response );
      return;
   };
   my $dlfsize = -s $tfile;
   if ( $dlfsize ) {
      if ( $dlfsize < $crawler_min_bytes ) {
         failed( 'Response too small, ignored: ' . $dlfile . ' = ' . $dlfsize, $tfile, $response );
         return;
      };
      if ( ! rename $tfile, $dlfile ) {
         failed( 'Could not rename tempfile: ' . $dlfile, $tfile, $response );
         return;
      };
      $dlfsize = -s $dlfile;
      print dtlog . 'Received new (' . $symbol . ') ' . $dlfsize . ' bytes..' . "\n";
   }
   $dlfsize;
}

#

$| = 1;

srand;

my ( $listfile ) = @ARGV;

die 'Usage: crawler.pl listfile' . "\n" unless $listfile;

die 'Empty list file: ' . $listfile . "\n" unless -s $listfile;

local (*LFILE);
open ( LFILE, '<' . $listfile ) or die 'Could not open list file ' . $listfile . "\n";
my $newjobs = Thread::Queue->new();
my $jobs_counter = 0;

while ( <lfile> ) {
   chomp;
   my ( $Ticker ) = split "\t";
   $newjobs->enqueue( $Ticker );
   $jobs_counter++;
}

close LFILE;

print dtlog . 'Ticker list loaded, queued: ' . $jobs_counter . "\n";

my $max_parallel_wrks = ( $crawler_max_threads < $jobs_counter ) ? $crawler_max_threads : $jobs_counter ;
my @workers = map {
   $worker_id++;
   threads->create( 
     sub {   
        my $browser = LWP::UserAgent->new();
        $browser->timeout( $crawler_req_timeout );
        sleep rand( $rnd_delay + $worker_id );
        while ( defined ( my $item = $newjobs->dequeue ) ) { 
            threads->yield;
            my $ntry = 1;
            do {
               print dtlog . 'Try ' . $ntry . ' (' . $item . ')' . "\n";
            } until ( download( $browser, $item ) || ++$ntry > $crawler_max_attempts );
            return if $worker_id > $crawler_min_threads && $crawler_max_failed < $crawler_req_err ;
        }
     }
  );
} 1 .. $max_parallel_wrks;

$worker_id = 0;
sleep $min_delay;
$newjobs->enqueue( undef ) for @workers;  
$_->join() for @workers;
print dtlog . 'DONE.' . "\n";
exit 0;

Test ticker file

Now, using our favorite text editor, let us prepare a list file tickers.txt containing 23 sample tickers (an arbitrary selection with no preference at all, just for the sake of this demo):


C
B
D
A
W
X
Y
Z
AAPL
FB
GOOG
SPY
DDD
QQQ
VZ
T
BB
MMM
ZZZ
R
E
NG
K

Real test run

The command line and test output for our freshly baked crawler script follows (remember, we are using our real API KEY instead of a demo key for this run, are you?):


$ perl crawler.pl tickers.txt
Sat Feb 17 17:46:59 #00 Ticker list loaded, queued: 23
Sat Feb 17 17:46:59 #14 Try 1 (C)
Sat Feb 17 17:46:59 #01 Try 1 (B)
Sat Feb 17 17:47:00 #03 Try 1 (D)
Sat Feb 17 17:47:01 #02 Try 1 (A)
Sat Feb 17 17:47:01 #09 Try 1 (W)
Sat Feb 17 17:47:01 #10 Try 1 (X)
Sat Feb 17 17:47:02 #06 Try 1 (Y)
Sat Feb 17 17:47:04 #08 Try 1 (Z)
Sat Feb 17 17:47:04 #04 Try 1 (AAPL)
Sat Feb 17 17:47:05 #05 Try 1 (FB)
Sat Feb 17 17:47:05 #13 Try 1 (GOOG)
Sat Feb 17 17:47:05 #14 Received new (C) 93569 bytes..
Sat Feb 17 17:47:05 #14 Try 1 (SPY)
Sat Feb 17 17:47:06 #12 Try 1 (DDD)
Sat Feb 17 17:47:06 #11 Try 1 (QQQ)
Sat Feb 17 17:47:06 #08 Received new (Z) 12139 bytes..
Sat Feb 17 17:47:06 #08 Try 1 (VZ)
Sat Feb 17 17:47:07 #07 Try 1 (T)
Sat Feb 17 17:47:07 #03 Received new (D) 90894 bytes..
Sat Feb 17 17:47:07 #03 Try 1 (BB)
Sat Feb 17 17:47:09 #10 Received new (X) 94585 bytes..
Sat Feb 17 17:47:09 #10 Try 1 (MMM)
Sat Feb 17 17:47:11 #09 Received new (W) 16745 bytes..
Sat Feb 17 17:47:11 #09 Try 1 (ZZZ)
Sat Feb 17 17:47:13 #07 Received new (T) 90518 bytes..
Sat Feb 17 17:47:13 #07 Try 1 (R)
Sat Feb 17 17:47:14 #15 Try 1 (E)
Sat Feb 17 17:47:15 #16 Try 1 (NG)
Sat Feb 17 17:47:17 #11 Received new (QQQ) 100703 bytes..
Sat Feb 17 17:47:17 #11 Try 1 (K)
Sat Feb 17 17:47:19 #02 Received new (A) 91423 bytes..
Sat Feb 17 17:47:20 #13 Received new (GOOG) 21932 bytes..
Sat Feb 17 17:47:21 #10 Received new (MMM) 95598 bytes..
Sat Feb 17 17:47:25 #09 Received new (ZZZ) 1191 bytes..
Sat Feb 17 17:47:26 #06 Received new (Y) 83553 bytes..
Sat Feb 17 17:47:28 #03 Received new (BB) 89644 bytes..
Sat Feb 17 17:47:34 #04 Received new (AAPL) 101977 bytes..
Sat Feb 17 17:47:37 #11 Received new (K) 89000 bytes..
Sat Feb 17 17:47:38 #12 Received new (DDD) 79709 bytes..
Sat Feb 17 17:47:38 #16 Received new (NG) 59517 bytes..
Sat Feb 17 17:47:41 #14 Received new (SPY) 105393 bytes..
Sat Feb 17 17:47:42 #05 Received new (FB) 29390 bytes..
Sat Feb 17 17:47:43 #15 Received new (E) 91171 bytes..
Sat Feb 17 17:47:48 #07 Received new (R) 91383 bytes..
Sat Feb 17 17:47:50 #08 Received new (VZ) 91758 bytes..
Sat Feb 17 17:47:53 #01 Received new (B) 84649 bytes..
Sat Feb 17 17:47:53 #00 DONE.

Conclusion

We have successfully run the script to obtain daily price history for 23 tickers, that was complete in less than a minute. If you would like to compare how long it takes to run with one thread (without parallel requests), you might set $crawler_max_threads value to “1” and run the crawler again. It should take at least two to three times longer—if not more—for the same list of tickers. “Like a lizard on a window pane”. Mission accomplished. Cheers.

Leave a Reply

Your email address will not be published. Required fields are marked *