Perl脚本多线程上传FTP文件
本脚本是对《[Perl]FTP自动上传文件的脚本以及配置文件》的多线程扩展,当然首先对方FTP站点允许同一个IP发起多个连接。
ithreads支持
Perl5.6.0已经加入了ithreads支持,我们通过
use threads;
导入threads多线程处理包;并且通过
use threads::shared;
使用线程间共享变量。
在定义全局并线程间共享的变量时,要这样写:
my $CurrentThreads: shared = 0; #当前线程总数
因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。如果想让一个变量共享,须要显式地指定它才行。
当然我们还要限制并发活动线程数目,因为最多线程数是1-65。perl最多允许64个子线程,加上主线程因此最多65个线程,而且FTP站点允许多大连接数。这里$MaxThreads设置是从配置文件读取的。
配置文件的读取我们是采用
use Config::IniFiles;
库的。它的读取很简单:
my $cfg = Config::IniFiles->new( -file => "FTPUpload.config" );
等待线程的完全退出
通过不断地调用
my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);
启动了多干个线程后,我们这里一定要调用
push(@$self, \$thread);
因为,创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。
所以,在最后我们要等待这些线程的完全退出:
foreach my $thread (@$self)
{
print("Joining thread\n");
$$thread->join();
}
否则,threads库会在最后打印如下信息:
A thread exited while 2 threads were running.
程序大致的流程是:
第一步,尝试用配置信息登陆ftp站点,这只是验证能够登陆而已;
第二步,在指定文件夹A类下寻找符合条件的文件,并将对每一个A类文件创建一个线程上传。每个线程单独创建一个FTP连接,并不断地尝试上传这个文件直至成功;
第三步,如果A类文件们全部上传成功,那么在指定文件夹B类下寻找指定文件, 并且上传到FTP指定目录下
第四步,写成功/失败日志。
最后,我们要写的成功/失败日志的格式如下所示:
成功: 生成一个名为“Upload_Succ_2005_01_04_17_23.log”的日志文件
文件格式:输出上传时间,以及所有上传文件名及其大小和耗费的时间。
失败: 生成一个名为“Upload_Fail_2005_01_04_17_23.log”的日志文件
文件格式:输出上传时间,以及已经上传的文件名及其大小和耗费的时间,和失败的文件名及原因。
配置perl脚本运行有两个办法:
u 您可以在Windows计划任务中配置运行“Perl Upload.pl”的时间,这需要在Windows环境中配置ActivePerl 5.8.4.810;
u 您也可以利用Perl2Exe(p2x-8.40-Win32)来将perl脚本编译为一个exe可执行程序,在计划任务中运行这个exe(这需要PerlCRT.dll在系统路径下)。
[注意!]在运行之前,您必须修改“Upload.config”文件以配置所需的重要参数。
外部配置参数
在和perl脚本同一目录下的“Upload.config”配置文件中,是事先配置的10个外部参数:
# 参数1: ftp_server:
# FTP服务器的IP地址。
# 参数2: ftp_dir:
# 指定的FTP上传目录路径;
# 参数3: ftp_uid:
# FTP的登陆用户名;
# 参数4: ftp_pw:
# FTP的登陆密码;
# 参数5: src_dir_WAVFiles,这是一个数组:
? 指定A类文件夹,放置所有要上传的语音文件;
? 注意:这个目录下的子文件夹也会被上传。
# 参数6: src_dir_NamesListFile,这是一个数组:
? 指定B类文件夹,放置B类文件. 注意:这个目录下的子文件夹也会被上传。
# 参数7: ftp_timeout:
# FTP的访问超时时间,以秒为单位;
# 参数8: ftp_retrytimes:
# FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误;
# 参数9: ftp_debug :
# FTP的详细的调试信息输出
# 参数10: maxthreads:
# 启动同时访问FTP站点的线程最大数目
附录:
Upoad.pl内容:
#!/usr/bin/perl -w
####################################################################
#
# 工程项目: FTP自动上传两类文件
#
# 模块名称: FTPAutoUpload
#
# 模块任务: 按照指定的文件夹目录,自动将该文件夹下的所有文件上传到指定ftp站点的指定目录下
#
# 程序名称: Upload.pl
#
# 程序员: Uwe Keim
# 郑昀
# 历史记录:
# 编号 日期 作者 备注
# 1 2000 Uwe Keim 创建
# 2 2005.1.5 郑昀 加入容错处理和读取外部配置文件部分
# 3 2005.1.14 郑昀 加入多线程支持
#
####################################################################
#use strict;
##================================================================##
## 引用的库声明 1
## 读取ini配置文件的库
use Config::IniFiles;
my $cfg = Config::IniFiles->new( -file => "Upload.config" );
## 启动同时访问FTP站点的线程最大数目 ##
$config_maxthreads = $cfg->val('Threads', 'maxthreads') || 1;
##================================================================##
##================================================================##
## 引用的库声明 2
use File::Copy;
use File::stat;
use File::Find;
use Net::FTP;
use Date::Pcalc qw(Delta_DHMS);
use Date::Parse;
#!!use Win32::OLE; # 加入这个库会导致Perl执行线程时崩溃
#!!use Win32::OLE::Variant; # 加入这个库会导致Perl执行线程时崩溃
##================================================================##
##================================================================##
## 引用的库声明 3
# Perl5.6.0已经加入了ithreads支持
use threads; #导入threads多线程处理包
use threads::shared; #使用线程间共享变量
my $MaxThreads = $config_maxthreads; #最多线程数(1-65),perl最多允许64个子线程,加上主线程因此最多65个线程
#此次设置是从配置文件读取的
# 在定义全局并线程间共享的变量时,要这样写
my $CurrentThreads: shared = 0; #当前线程总数
## $total_files是上传文件的数目
my $total_files: shared = 0;
## $processed_files是已上传文件的数目
my $processed_files: shared = 0;
## $skipped_files是跳过文件的数目
my $skipped_files: shared = 0;
## FTP重试次数
my $ftp_retrytimes: shared = 0;
## $g_nIsAllWAVsFile_UploadSuccess代表是否已经完全将语音文件上传,-1为不是,1为是:
my $g_nIsAllWAVsFile_UploadSuccess: shared = 1;
# 因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。
# 如果想让一个变量共享,须要显式地指定它才行。
#
##================================================================##
##================================================================##
## 从配置文件读取外部参数 ##
##
## FTP服务器的IP地址 ##
my $ftp_server = $cfg->val('FTPServer', 'ftp_server') || '';
## 指定的FTP上传目录路径 ##
#! 切记:文件夹最后不要加"/"符号 !#
my $ftp_dir = $cfg->val('FTPServer', 'ftp_dir') || '';
## FTP的登陆用户名 ##
my $ftp_uid = $cfg->val('FTPServer', 'ftp_uid') || '';
## FTP的登陆密码 ##
my $ftp_pw = $cfg->val('FTPServer', 'ftp_pw') || '';
## FTP的访问超时时间,以秒为单位,默认设置为30分钟 ##
my $ftp_timeout = $cfg->val('FTPServer', 'ftp_timeout') || 1800;
####$ftp_timeout = 1800;
## FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 ##
$ftp_retrytimes = $cfg->val('FTPServer', 'ftp_retrytimes') || 3;
####$ftp_retrytimes = 3;
## FTP的详细的调试信息输出 ##
$ftp_debug = $cfg->val('FTPServer', 'ftp_debug') || 0;
####$ftp_debug = 0;
## 指定文件夹“语音文件”,放置所有要上传的语音文件 ##
#! 切记:文件夹最后不要加"\\"符号 !#
my @src_dir_WAVFiles = $cfg->val('SrcDirectory', 'src_dir_WAVFiles');
## 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 ##
#! 切记:文件夹最后不要加"\\"符号 !#
my @src_dir_NamesListFile = $cfg->val('SrcDirectory', 'src_dir_NamesListFile');
## 一个字符串集合,表明哪些类型的文件/文件夹将不被上传到服务器 ##
my @wc_exclude = ("_vti",".log","\\bak","\\data","server.inc");
##================================================================##
##================================================================##
## 记录全部过程的日志文件准备
my $logfilename = 'upload.log';
my $log_cnt = 0;
my $span = 0;
LOG("");
LOG("郑昀(R) 自动上传两类文件 Version 0.1");
LOG("没有版权(C) Linktone 2005-2006。不保留所有权利。");
LOG("");
LOG("用法: Perl FTPUpload.MultiThread.pl");
LOG("");
##================================================================##
##================================================================##
##=== 程序执行的第一步:尝试登陆ftp站点 ==========================##
## $start_date计算出当前开始的时间
my $start_date = timeString(time);
## $g_nUploadSuccess代表是否已经完全上传,-1为不是,1为是:
my $g_nUploadSuccess = 1;
## $g_strLastError代表上次错误原因:
my $g_strLastError = "";
LOG("正在链接至指定FTP服务器($ftp_server)...");
# 第二个参数用于指定超时时间
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
$g_strLastError = "不能连接到FTP服务器,错误原因:".$@;
LOG("$g_strLastError@\n");
$g_nUploadSuccess = -1;
}
else
{
$ftp->login($ftp_uid, $ftp_pw);
if($@)
{
$g_strLastError = "不能登陆FTP服务器,错误原因:".$@;
LOG("$g_strLastError\n");
$g_nUploadSuccess = -1;
}
else
{
LOG("链接FTP服务器成功!");
## 关闭连接
$ftp->quit() or warn "咦,为什么不能够和FTP服务器断开连接呢?: $@\n";
##================================================================##
##================================================================##
##=== 程序执行的第二步,将指定文件夹“语音文件”下所有语音文件上传到FTP站点指定目录下 ===##
#my %lookup;
LOG("准备上传“语音文件”目录(@src_dir_WAVFiles)下的所有文件!");
find(\&processFiles, @src_dir_WAVFiles);
LOG("目录(@src_dir_WAVFiles)已经处理完毕,结果是:");
foreach my $thread (@$self)
{
print("Joining thread\n");
# join() does three things: it waits for a thread to exit,
# cleans up after it, and returns any data the thread may
# have produced.
$$thread->join();
}
##================================================================##
##=== 程序执行的第三步,将指定文件夹“命名对照列表文件TXT”下文件上传到FTP站点指定目录下 ===##
if($g_nIsAllWAVsFile_UploadSuccess > 0)
{
LOG("+===============================+");
LOG("准备上传“语音文件”目录(@src_dir_NamesListFile)下的所有文件!");
@$self = {};
find(\&processFiles, @src_dir_NamesListFile);
foreach my $thread (@$self)
{
print("Joining thread\n");
$$thread->join();
}
LOG("目录(@src_dir_NamesListFile)已经处理完毕,结果是:");
LOG("-===============================-");
}
else
{
LOG("-===============================-");
LOG("由于语音文件目录并没有完全上传,所以本命名对照文件不上传!");
LOG("-===============================-");
}
##================================================================##
##================================================================##
# 日志文件的最后是一个统计报告
$span = calcDeltaSeconds($start_date,timeString(time));
LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");
}
closeLogfile();
}
##================================================================##
##================================================================##
##=== 程序执行的第四步,写成功日志 ===============================##
if($g_nIsAllWAVsFile_UploadSuccess > 0 && $g_nUploadSuccess > 0)
{
$logfilename = 'FTPUpload.MultiThread_Succ_'.shortTimeString(time).'.log';
$log_cnt = 0;
LOG("");
LOG("郑昀(R)自动上传文件 Version 0.1");
LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");
LOG("");
LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");
LOG("");
closeLogfile();
}
##================================================================##
##================================================================##
##=== 程序执行的第四步,写失败日志 ===============================##
if($g_nIsAllWAVsFile_UploadSuccess < 0 || $g_nUploadSuccess < 0)
{
$logfilename = 'FTPUpload.MultiThread_Fail_'.shortTimeString(time).'.log';
$log_cnt = 0;
LOG("");
LOG("郑昀(R)自动上传文件 Version 0.1");
LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");
LOG("");
LOG("上传结果:失败。失败原因:$g_strLastError。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");
LOG("");
closeLogfile();
}
##================================================================##
####################################################################
## 以下是子函数体的定义
##-------------------------------------------------------------##
##
## 函数名称:processFiles
## 功能:
## 得到指定文件夹下的所有文件以及子文件夹,然后依次处理它们。
##
## 程序员: 郑昀(Yun.Zheng@Linktone.com)
##
## 历史记录:
## 编号 日期 作者 备注
## 1 2005.1.4 郑昀
##
##-------------------------------------------------------------##
sub processFiles
{
my $srcdir = fsToBs($File::Find::dir);
my $srcpath = fsToBs($File::Find::name);
my $base = fsToBs($File::Find::topdir);
foreach my $exclude (@wc_exclude) {
if ( index($srcpath, $exclude)>-1 ) {
$File::Find::prune = 1 if -d $srcpath;
return;
}
}
# no DIRECT processing of directories.
if ( -d $srcpath ) {
return;
}
my $dstdir = $srcdir;
my $dstpath = $srcpath;
$dstdir =~ s{\Q$base\E}{$ftp_dir}is;
$dstpath =~ s{\Q$base\E}{$ftp_dir}is;
$dstdir = bsToFs($dstdir);
$dstpath = bsToFs($dstpath);
#-------------------------------------
# 早先版本,单线程逐次处理每一个文件
#processFile($srcpath,$dstpath,$dstdir);
#-------------------------------------
#-------------------------------------
# 20050114版本,多线程并发处理文件
# 每次创建线程处理文件之前,先检查当前已启动的线程数目是否小于最大允许数目
# 如果已经创建了足够多的线程,就不断地轮询
while(1)
{
if($CurrentThreads < $MaxThreads)
{
# 创建线程,传入参数
my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);
#my $thread = threads->create('processFile2',"zhengyun");
# 创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,
# 否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。
push(@$self, \$thread);
#$thread->detach();
last; # 退出循环
}
else
{
# 静候一秒钟,注意这里sleep的参数是以秒为单位的:
LOG("-sleep 1秒钟");
sleep 1;
}
}
#-------------------------------------
}
sub processFile2
{
my ($prime) = @_;
print "Thread started\n".$prime."\n";
}
##-------------------------------------------------------------##
##
## 函数名称:processFile
## 功能:
## 对目标文件进行FTP上传,如果失败就重试若干次。
##
## 程序员: 郑昀(Yun.Zheng@Linktone.com)
##
## 历史记录:
## 编号 日期 作者 备注
## 1 2005.1.4 郑昀
##
##-------------------------------------------------------------##
sub processFile
{
## 当前新建的线程数加一:
{
lock($CurrentThreads);
$CurrentThreads++;
}
my ($srcThread, $dstThread, $dstdirThread) = @_;
## 当前处理的总文件数加一:
{
lock($total_files);
$total_files++;
LOG("正在处理文件 $total_files \"$srcThread\"...");
}
# --------------------
# check time.
my $need_upload = 0;
my $bPutResult = 0;
# create time.
my $t1 = $lookup{$srcThread};
my $t2 = timeString(stat($srcThread)->mtime);
if ( not defined $t1 ) {
$lookup{$srcThread} = $t2;
$need_upload = 1;
} else {
my $delta_sec = calcDeltaSeconds($t1,$t2);
$need_upload = 1 if $delta_sec>5; # 5 seconds as tolerance.
}
# --------------------
if ( $need_upload > 0 )
{
my $nProcessIndex = 1;
for ($nProcessIndex = 1; $nProcessIndex <= $ftp_retrytimes; $nProcessIndex++)
{
my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);
if($@)
{
$g_strLastError = "不能连接到FTP服务器,错误原因:".$@;
LOG("$g_strLastError@\n");
}
else
{
$ftp->login($ftp_uid, $ftp_pw);
if($@)
{
$g_strLastError = "不能登陆FTP服务器,错误原因:".$@;
LOG("$g_strLastError\n");
}
else
{
$ftp->binary;
LOG("第$nProcessIndex次尝试:正在上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\"...当前活动线程数:$CurrentThreads");
{
$bPutResult = 0;
$ftp->mkdir($dstdirThread, 1);
$ftp->put($srcThread, $dstThread) or $bPutResult = -1;
}
if($bPutResult < 0)
{
LOG("第$nProcessIndex次尝试:不能上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\" (dst-dir: \"$dstdirThread\")。\n");
if($@)
{
LOG("第$nProcessIndex次尝试:错误原因是:$@\n");
}
}
else
{
LOG("第$nProcessIndex次尝试:上传文件成功:从源 \"$srcThread\" 到目标 \"$dstThread\"!\n");
{
lock($processed_files);
$processed_files++;
}
## 关闭连接
$ftp->quit() if($ftp);
last; # 退出循环
}
}
}
## 关闭连接
$ftp->quit() if($ftp);
}#for
if($bPutResult < 0)
{
# 尝试了这么多次之后还是不能够上传,报告错误罢了
{
lock($skipped_files);
$skipped_files++;
lock($g_nIsAllWAVsFile_UploadSuccess);
$g_nIsAllWAVsFile_UploadSuccess = -1;
}
}
}
else
{
{
lock($skipped_files);
$skipped_files++;
}
}
## 当前活动的线程数减一:
{
lock($CurrentThreads);
$CurrentThreads--;
}
}
####################################################################
sub bsToFs {
my ($s) = @_;
$s =~ s/\\/\//gis;
return $s;
}
sub fsToBs {
my ($s) = @_;
$s =~ s/\//\\/gis;
return $s;
}
sub timeString {
my ($tm) = @_;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);
}
sub shortTimeString {
my ($tm) = @_;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);
return sprintf("%04d_%02d_%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);
}
# input dates as string "YYYY-MM-DD HH:MM:SS".
# earlier as first parameter, later as second.
sub calcDeltaSeconds {
my ($t1,$t2) = @_;
my ($year1,$month1,$day1,$hh1,$mm1,$ss1) = scanDate($t1);
my ($year2,$month2,$day2,$hh2,$mm2,$ss2) = scanDate($t2);
my ($days, $hours, $minutes, $seconds) = Delta_DHMS(
$year1, $month1, $day1, $hh1, $mm1, $ss1, # earlier.
$year2, $month2, $day2, $hh2, $mm2, $ss2); # later.
return $seconds + $minutes*60 + $hours*60*60 + $days*60*60*24.
}
sub removeFilename {
my ($s) = @_;
my $pos = rindex($s,'\\');
return substr($s, 0, $pos);
}
# format: "2000-09-29 09:09:51".
sub scanDate {
my ($date) = @_;
my ($year, $month, $day, $hour, $minute, $seconds);
$year = substr($date, 0, 4);
$month = substr($date, 5, 2);
$day = substr($date, 8, 2);
$hour = substr($date, 11, 2);
$minute = substr($date, 14, 2);
$seconds = substr($date, 17, 2);
return ($year, $month, $day, $hour, $minute, $seconds);
}
####################################################################
sub LOG {
my ($text) = @_;
my $time = timeString time;
# log to stdout.
print "[$time] $text\n";
# log to logfile.
my $LOG_STEP = 10;
flushLogfile() if ($log_cnt % $LOG_STEP)==0 or $log_cnt==0;
$log_cnt++;
print HLOG "[$time] $text\n";
}
sub openLogfile {
closeLogfile();
open(HLOG,">>$logfilename") or die("打开日志文件出错:文件名为 $logfilename ;错误原因为: $!");
};
sub closeLogfile {
close HLOG if defined HLOG;
}
sub flushLogfile {
closeLogfile();
openLogfile();
}
####################################################################
附录:
Upoad.config内容:
##================================================================##
## 配置的外部参数 ##
##
[FTPServer]
#- FTP服务器的IP地址 -#
ftp_server =
#- 指定的FTP上传目录路径 -#
#! 切记:文件夹最后不要加"/"符号 !#
ftp_dir =
#- FTP的登陆用户名 -#
ftp_uid =
#- FTP的登陆密码 -#
ftp_pw =
#- FTP的访问超时时间,以秒为单位 -#
ftp_timeout =1800
#- FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 -#
ftp_retrytimes =10
#- FTP的详细的调试信息输出 -#
ftp_debug =1
##=====================================================##
##=====================================##
## 配置的外部参数 ##
##
[SrcDirectory]
#- 指定文件夹“语音文件”,放置所有要上传的语音文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_WAVFiles =
#- 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_NamesListFile =
##=========================================##
##========================================##
## 配置的外部参数3 ##
##
[Threads]
#- 启动同时访问FTP站点的线程最大数目 -#
maxthreads =1
##==========================================##