0001 function dmri_fiber_track_prob(mif_file, parcel_dir, mask_file, anatmat_file, process_host, Nworker)
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040 if nargin < 4
0041 error('Please check input arguments.');
0042 end
0043 if exist(mif_file, 'file') ~= 2
0044 error('Specified mif_file not found.');
0045 end
0046 if exist(parcel_dir, 'dir') ~= 7
0047 error('Specified parcel directory not found.');
0048 end
0049 if exist(mask_file, 'file') ~= 2
0050 error('Specified mask_file not found.');
0051 end
0052
0053 output_dir = fileparts(anatmat_file);
0054 if exist(output_dir, 'dir') ~= 7
0055 mkdir(output_dir);
0056 end
0057 if ~exist('process_host', 'var') || isempty(process_host)
0058 process_host = {'localhost'};
0059 end
0060 if ischar(process_host)
0061 process_host = {process_host};
0062 end
0063 if ~exist('Nworker', 'var')
0064 Nworker = 8;
0065 end
0066
0067 dmri_setenv;
0068 check_cmd = which('is_cpu_host_available.sh');
0069 disp('Connecting hosts without password ...');
0070 connection = true;
0071 for k=1:length(process_host)
0072 if strcmpi(process_host{k}, 'localhost')
0073 [a, real_host] = system('hostname');
0074 real_host = real_host(1:end-1);
0075 process_host{k} = real_host;
0076 end
0077 fprintf('%s ...', process_host{k});
0078 ret_msg = '';
0079 [a, ret_msg] = system(['timeout 5 ' check_cmd ' ', process_host{k}]);
0080
0081 is_ok = true;
0082 if a == 0 || a == 1
0083 cmd_track = ['timeout 5 ssh ' process_host{k} ' "', 'bash -c ''source ' getenv('BASH_PROF') ';streamtrack --help''"'];
0084 [ret_st, ret_msg] = system(cmd_track);
0085 if ret_st ~= 0
0086 is_ok = false;
0087 end
0088 else
0089 is_ok = false;
0090 end
0091 if is_ok
0092 disp('OK.');
0093 else
0094 connection = false;
0095 disp(sprintf('Failed.\n %s', ret_msg));
0096 end
0097 end
0098 if connection == false
0099 error('Please check the setting of ssh connection.');
0100 end
0101
0102
0103
0104
0105
0106
0107
0108 pushd = pwd;
0109 cd(output_dir);
0110 output_dir = pwd;
0111 cd(pushd);
0112
0113
0114 work_dir = tempname(output_dir);
0115 mkdir(work_dir);
0116 process_fiber = cell(0);
0117 kill_remote_process = true;
0118
0119
0120 function terminate_fcn
0121 if kill_remote_process
0122
0123 for k=1:length(process_fiber)
0124 process_fiber{k}.destroy();
0125 end
0126 end
0127
0128 dmri_fiber_tck_file_worker_stop(work_dir);
0129 end
0130
0131 fprintf('working directory : %s\n', work_dir);
0132 disp('Now preparing for execution...');
0133
0134
0135
0136 copyfile(fullfile(parcel_dir, filesep, 'parcel*.nii.gz'), work_dir);
0137 copyfile(mask_file, work_dir);
0138
0139
0140 [p, f, e] = fileparts(mask_file);
0141 mask_nii_gz_file = fullfile(work_dir, [f, e]);
0142 mask_nii_file = strrep(mask_nii_gz_file, '.nii.gz', '.nii');
0143 cmd = ['gunzip ' mask_nii_gz_file];
0144 system(cmd);
0145
0146
0147 Nhost = length(process_host);
0148
0149 parcel_mm_file = fullfile(parcel_dir, filesep, 'parcel_mm_coord.mat');
0150 copyfile(parcel_mm_file, work_dir);
0151 load(parcel_mm_file, 'Nlabel');
0152
0153 script_files = cell(0, 1);
0154 fids = cell(0, 1);
0155 Nlabel_per_host = ceil(Nlabel / Nhost);
0156
0157 finished_list_dir = fullfile(work_dir, filesep, 'finished');
0158 mkdir(finished_list_dir);
0159 timeout_sh = which('automatic_shutoff.sh');
0160
0161
0162
0163 FT_JOB_FILE = fullfile(work_dir, filesep, ['jobs.txt']);
0164
0165
0166 fid = fopen(FT_JOB_FILE, 'wt');
0167 if fid == -1, error('Failed to create script.'); end
0168
0169 fiber_tracking_sh_files = cell(0);
0170 for k=1:Nlabel
0171 fiber_tracking_sh_files{k} = fullfile(work_dir, ['fiber_track' num2str(k), '.sh']);
0172 fprintf(fid, '%s\n', fiber_tracking_sh_files{k});
0173 end
0174 fclose(fid);
0175
0176 ix = randperm(Nlabel);
0177
0178
0179
0180
0181 for k=1:Nlabel
0182 ft_file = fiber_tracking_sh_files{k};
0183
0184 isok = dmri_script_file_create(ft_file);
0185 if isok == false, error('Failed to create script.'); end
0186
0187
0188 ft_fid = fopen(ft_file, 'a');
0189 if ft_fid == -1
0190 error('File open failed. processing terminated.');
0191 end
0192
0193 parcel_n = ix(k);
0194 parcel_n_str = num2str(parcel_n);
0195
0196
0197 parcel_n_gz_file = fullfile(work_dir, filesep, ['parcel', parcel_n_str, '.nii.gz']);
0198 parcel_n_ex_gz_file = fullfile(work_dir, filesep, ['parcel_ex', parcel_n_str, '.nii.gz']);
0199 fprintf(ft_fid, 'gunzip %s\n', parcel_n_gz_file);
0200 fprintf(ft_fid, 'gunzip %s\n', parcel_n_ex_gz_file);
0201
0202
0203 tck_file = fullfile(work_dir, filesep, ['parcel', parcel_n_str, '.tck']);
0204 parcel_n_nii_file = strrep(parcel_n_gz_file, '.nii.gz', '.nii');
0205 parcel_n_ex_nii_file = strrep(parcel_n_ex_gz_file, '.nii.gz', '.nii');
0206
0207 fprintf(ft_fid, ['echo "Fiber tracking - parcel', parcel_n_str, '"\n']);
0208 fprintf(ft_fid, '%s\n', [timeout_sh, ' ', ...
0209 'streamtrack SD_PROB', ...
0210 ' ' ,mif_file, ...
0211 ' -seed ' ,parcel_n_nii_file, ...
0212 ' -include ' ,parcel_n_ex_nii_file, ...
0213 ' -mask ' ,mask_nii_file, ...
0214 ' ' ,tck_file, ...
0215 ' -num 100000 -stop -unidirectional -maxnum 100000 -length 300 -debug']);
0216 fprintf(ft_fid, '%s\n', ['if [ ! -e ' tck_file, ' ]; then']);
0217 fprintf(ft_fid, '%s\n', [' echo -n > ' tck_file ]);
0218 fprintf(ft_fid, '%s\n', ['fi']);
0219 fprintf(ft_fid, '%s\n', ['mkdir ', fullfile(finished_list_dir, filesep, parcel_n_str)]);
0220 fprintf(ft_fid, '%s\n', ['if [ ! -d ' work_dir ' ]; then']);
0221 fprintf(ft_fid, 'exit $?;\nfi\n');
0222
0223 fclose(ft_fid);
0224 end
0225
0226
0227
0228 HOSTS_FILE = fullfile(work_dir, filesep, 'hosts.txt');
0229 fid = fopen(HOSTS_FILE, 'wt');
0230 if fid == -1, error('hosts file write error.'); end
0231 for k=1:length(process_host)
0232 fprintf(fid, '%s\n', process_host{k});
0233 end
0234 fclose(fid);
0235
0236
0237
0238
0239 cpu_job_throw = which('cpu_job_throw.sh');
0240
0241
0242
0243
0244
0245 cmd = [cpu_job_throw, ' ', FT_JOB_FILE, ' ', HOSTS_FILE];
0246 cmd_exe = [cmd ' > ' fullfile(work_dir, 'main_log.txt &')];
0247 system(cmd_exe);
0248
0249
0250
0251
0252 while(1)
0253 log_dir = length(dir(fullfile(work_dir, 'log*')));
0254 if log_dir
0255 break;
0256 else
0257 pause(1);
0258 end
0259 end
0260 fprintf('Started processing.\n');
0261
0262
0263
0264
0265 matlab_exe = fullfile(matlabroot, 'bin', 'matlab');
0266 prog_dir = fileparts(which('vbmeg.m'));
0267 for n=1:Nworker
0268 worker_script = fullfile(work_dir, filesep, ['worker', num2str(n), '.m']);
0269 worker_err_file = fullfile(work_dir, filesep, ['worker', num2str(n), '_error.txt']);
0270 fid = fopen(worker_script, 'wt');
0271 if fid == -1, error('Worker file create error.'); end
0272 fprintf(fid, '%s\n', 'try');
0273 fprintf(fid, '%s\n', ['cd(''', prog_dir, ''');']);
0274 fprintf(fid, '%s\n', 'vbmeg;');
0275 fprintf(fid, '%s\n', ['dmri_fiber_tck_file_worker_start(''', ...
0276 work_dir, ''', ' num2str(n), ');']);
0277 fprintf(fid, '%s\n', 'catch');
0278 fprintf(fid, '%s\n', ['fid = fopen(''' worker_err_file, ''', ''wt'');']);
0279 fprintf(fid, '%s\n', 'err = lasterror;');
0280 fprintf(fid, '%s\n', 'fprintf(fid, err.message)');
0281 fprintf(fid, '%s\n', 'fclose(fid);');
0282 fprintf(fid, '%s\n', 'end');
0283 fprintf(fid, '%s\n', 'exit;');
0284
0285 fclose(fid);
0286
0287 cmd = [matlab_exe, ...
0288 ' -nodisplay -nojvm -nosplash ', ...
0289 ' -singleCompThread ', ...
0290 '-r "cd ', work_dir, '; ' 'worker', num2str(n) ';"'];
0291 [status, result] = system([cmd, '&']);
0292 end
0293
0294
0295
0296
0297
0298
0299 onCleanup(@terminate_fcn);
0300
0301 mat_files_pre = cell(0);
0302 tic;
0303 err = false;
0304 while(1)
0305 log_info_list = dmri_fiber_log_info_get(work_dir);
0306
0307
0308 for l=1:length(log_info_list)
0309 result_file = log_info_list(l).result_file;
0310 err_file = log_info_list(l).err_file;
0311 if ~isempty(result_file)
0312 [tmp, msg] = system(['cat ' result_file, '| grep ExitCode=']);
0313 if isempty(msg), continue; end
0314 [parse] = textscan(msg, '%s', 'delimiter', '=,');
0315 host = parse{1}{4};
0316 exit_code = str2double(parse{1}{6});
0317 if exit_code ~= 0
0318 fprintf('errorlog(host:%s):\n%s\n', host, err_file);
0319 err = true;
0320 end
0321 end
0322 if l == length(log_info_list) && err
0323 return;
0324 end
0325 end
0326 d = dir(fullfile(work_dir, filesep, 'parcel_fiber*.mat'));
0327 mat_files_now = {d.name}';
0328
0329
0330 diff = setdiff(mat_files_now, mat_files_pre);
0331 if ~isempty(diff)
0332 for j=1:length(diff)
0333 fprintf('Created (%d/%d): %s\n', length(mat_files_pre)+j, Nlabel, diff{j});
0334 end
0335 mat_files_pre = mat_files_now;
0336 end
0337
0338 if length(mat_files_now) == Nlabel && length(dir(finished_list_dir)) == 2
0339 fprintf('Processing (%d/%d)\nDone.\n', Nlabel, Nlabel)
0340 break;
0341 end
0342 pause(5);
0343 end
0344
0345
0346 kill_remote_process = false;
0347 pause(2);
0348 dmri_fiber_tck_file_worker_stop(work_dir);
0349 pause(2);
0350 rmdir(finished_list_dir);
0351
0352
0353
0354
0355 save_anat_matrix(work_dir, Nlabel, 'parcel_fiber', anatmat_file);
0356 fprintf('Created Anatomical matrix file : %s\n', anatmat_file);
0357
0358
0359 zip(fullfile(output_dir, filesep, 'parcel_fiber.zip'), fullfile(work_dir, 'parcel_fiber*.mat'));
0360
0361
0362 rmdir(work_dir, 's');
0363
0364 toc;
0365
0366 end