matrices.cc
Go to the documentation of this file.
1 //LIC// ====================================================================
2 //LIC// This file forms part of oomph-lib, the object-oriented,
3 //LIC// multi-physics finite-element library, available
4 //LIC// at http://www.oomph-lib.org.
5 //LIC//
6 //LIC// Version 1.0; svn revision $LastChangedRevision$
7 //LIC//
8 //LIC// $LastChangedDate$
9 //LIC//
10 //LIC// Copyright (C) 2006-2016 Matthias Heil and Andrew Hazel
11 //LIC//
12 //LIC// This library is free software; you can redistribute it and/or
13 //LIC// modify it under the terms of the GNU Lesser General Public
14 //LIC// License as published by the Free Software Foundation; either
15 //LIC// version 2.1 of the License, or (at your option) any later version.
16 //LIC//
17 //LIC// This library is distributed in the hope that it will be useful,
18 //LIC// but WITHOUT ANY WARRANTY; without even the implied warranty of
19 //LIC// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 //LIC// Lesser General Public License for more details.
21 //LIC//
22 //LIC// You should have received a copy of the GNU Lesser General Public
23 //LIC// License along with this library; if not, write to the Free Software
24 //LIC// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
25 //LIC// 02110-1301 USA.
26 //LIC//
27 //LIC// The authors may be contacted at oomph-lib@maths.man.ac.uk.
28 //LIC//
29 //LIC//====================================================================
30 //Non-inline member functions for the matrix classes
31 
32 #ifdef OOMPH_HAS_MPI
33 #include "mpi.h"
34 #endif
35 
36 #include <cstring>
37 
38 #include<set>
39 #include<map>
40 
41 //#include <valgrind/callgrind.h>
42 
43 //oomph-lib headers
44 #include "matrices.h"
45 #include "linear_solver.h"
46 
47 
48 namespace oomph
49 {
50 
51 //============================================================================
52 /// Complete LU solve (overwrites RHS with solution). This is the
53 /// generic version which should not need to be over-written.
54 //============================================================================
56 {
57 #ifdef PARANOID
58  if(Linear_solver_pt==0)
59  {
60  throw OomphLibError("Linear_solver_pt not set in matrix",
61  OOMPH_CURRENT_FUNCTION,
62  OOMPH_EXCEPTION_LOCATION);
63  }
64 #endif
65 
66  // Copy rhs vector into local storage so it doesn't get overwritten
67  // if the linear solver decides to initialise the solution vector, say,
68  // which it's quite entitled to do!
69  DoubleVector actual_rhs(rhs);
70 
71  //Use the linear algebra interface to the linear solver
72  Linear_solver_pt->solve(this,actual_rhs,rhs);
73 }
74 
75 //============================================================================
76 /// Complete LU solve (Nothing gets overwritten!). This generic
77 /// version should never need to be overwritten
78 //============================================================================
80  DoubleVector &soln)
81 {
82 #ifdef PARANOID
83  if(Linear_solver_pt==0)
84  {
85  throw OomphLibError("Linear_solver_pt not set in matrix",
86  OOMPH_CURRENT_FUNCTION,
87  OOMPH_EXCEPTION_LOCATION);
88  }
89 #endif
90  //Use the linear algebra interface to the linear solver
91  Linear_solver_pt->solve(this,rhs,soln);
92 }
93 
94 //============================================================================
95 /// Complete LU solve (overwrites RHS with solution). This is the
96 /// generic version which should not need to be over-written.
97 //============================================================================
99 {
100 #ifdef PARANOID
101  if(Linear_solver_pt==0)
102  {
103  throw OomphLibError("Linear_solver_pt not set in matrix",
104  OOMPH_CURRENT_FUNCTION,
105  OOMPH_EXCEPTION_LOCATION);
106  }
107 #endif
108 
109  // Copy rhs vector into local storage so it doesn't get overwritten
110  // if the linear solver decides to initialise the solution vector, say,
111  // which it's quite entitled to do!
112  Vector<double> actual_rhs(rhs);
113 
114  //Use the linear algebra interface to the linear solver
115  Linear_solver_pt->solve(this,actual_rhs,rhs);
116 }
117 
118 //============================================================================
119 /// Complete LU solve (Nothing gets overwritten!). This generic
120 /// version should never need to be overwritten
121 //============================================================================
123  Vector<double> &soln)
124 {
125 #ifdef PARANOID
126  if(Linear_solver_pt==0)
127  {
128  throw OomphLibError("Linear_solver_pt not set in matrix",
129  OOMPH_CURRENT_FUNCTION,
130  OOMPH_EXCEPTION_LOCATION);
131  }
132 #endif
133  //Use the linear algebra interface to the linear solver
134  Linear_solver_pt->solve(this,rhs,soln);
135 }
136 
137 
138 ////////////////////////////////////////////////////////////////////////
139 ////////////////////////////////////////////////////////////////////////
140 ////////////////////////////////////////////////////////////////////////
141 
142 //===============================================================
143 /// Constructor, set the default linear solver to be the DenseLU
144 /// solver
145 //===============================================================
147 {
149 }
150 
151 //==============================================================
152 /// Constructor to build a square n by n matrix.
153 /// Set the default linear solver to be DenseLU
154 //==============================================================
155 DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n) :
156  DenseMatrix<double>(n)
157 {
159 }
160 
161 
162 //=================================================================
163 /// Constructor to build a matrix with n rows and m columns.
164 /// Set the default linear solver to be DenseLU
165 //=================================================================
166  DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n,
167  const unsigned long &m) :
168  DenseMatrix<double>(n,m)
169 {
171 }
172 
173 //=====================================================================
174 /// Constructor to build a matrix with n rows and m columns,
175 /// with initial value initial_val
176 /// Set the default linear solver to be DenseLU
177 //=====================================================================
178 DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n,
179  const unsigned long &m,
180  const double &initial_val) :
181  DenseMatrix<double>(n,m,initial_val)
182 {
184 }
185 
186 //=======================================================================
187 /// Destructor delete the default linear solver
188 //======================================================================
190 {
191  //Delete the default linear solver
193 }
194 
195 //============================================================================
196 /// LU decompose a matrix, by using the default linear solver
197 /// (DenseLU)
198 //============================================================================
200 {
201  //Use the default (DenseLU) solver to ludecompose the matrix
202  static_cast<DenseLU*>(Default_linear_solver_pt)->factorise(this);
203 }
204 
205 
206 //============================================================================
207 /// Back substitute an LU decomposed matrix.
208 //============================================================================
210 {
211  //Use the default (DenseLU) solver to perform the backsubstitution
212  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs,rhs);
213 }
214 
215 //============================================================================
216 /// Back substitute an LU decomposed matrix.
217 //============================================================================
219 {
220  //Use the default (DenseLU) solver to perform the backsubstitution
221  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs,rhs);
222 }
223 
224 
225 //============================================================================
226 /// Determine eigenvalues and eigenvectors, using
227 /// Jacobi rotations. Only for symmetric matrices. Nothing gets overwritten!
228 /// - \c eigen_vect(i,j) = j-th component of i-th eigenvector.
229 /// - \c eigen_val[i] is the i-th eigenvalue; same ordering as in eigenvectors
230 //============================================================================
232  DenseMatrix<double> &eigen_vect)
233  const
234 {
235 #ifdef PARANOID
236  // Check Matrix is square
237  if (N!=M)
238  {
239  throw OomphLibError(
240  "This matrix is not square, the matrix MUST be square!",
241  OOMPH_CURRENT_FUNCTION,
242  OOMPH_EXCEPTION_LOCATION);
243  }
244 #endif
245  // Make a copy of the matrix & check that it's symmetric
246 
247  // Check that the sizes of eigen_vals and eigen_vect are correct. If not
248  // correct them.
249  if (eigen_vals.size()!=N)
250  {
251  eigen_vals.resize(N);
252  }
253  if (eigen_vect.ncol()!=N || eigen_vect.nrow()!=N)
254  {
255  eigen_vect.resize(N);
256  }
257 
258  DenseDoubleMatrix working_matrix(N);
259  for (unsigned long i=0;i<N;i++)
260  {
261  for (unsigned long j=0;j<M;j++)
262  {
263 #ifdef PARANOID
264  if (Matrixdata[M*i+j]!=Matrixdata[M*j+i])
265  {
266  throw OomphLibError(
267  "Matrix needs to be symmetric for eigenvalues_by_jacobi()",
268 OOMPH_CURRENT_FUNCTION,
269  OOMPH_EXCEPTION_LOCATION);
270  }
271 #endif
272  working_matrix(i,j)=(*this)(i,j);
273  }
274  }
275 
276  DenseDoubleMatrix aux_eigen_vect(N);
277 
278  throw OomphLibError(
279  "Sorry JacobiEigenSolver::jacobi() removed because of licencing problems.",
280  OOMPH_CURRENT_FUNCTION,
281  OOMPH_EXCEPTION_LOCATION);
282 
283  // // Call eigensolver
284  // unsigned long nrot;
285  // JacobiEigenSolver::jacobi(working_matrix, eigen_vals, aux_eigen_vect,
286  // nrot);
287 
288  // Copy across (and transpose)
289  for (unsigned long i=0;i<N;i++)
290  {
291  for (unsigned long j=0;j<M;j++)
292  {
293  eigen_vect(i,j)=aux_eigen_vect(j,i);
294  }
295  }
296 }
297 
298 
299 //============================================================================
300 /// Multiply the matrix by the vector x: soln=Ax
301 //============================================================================
303 (const DoubleVector &x, DoubleVector &soln) const
304 {
305 #ifdef PARANOID
306  // Check to see if x.size() = ncol().
307  if (x.nrow()!=this->ncol())
308  {
309  std::ostringstream error_message_stream;
310  error_message_stream
311  << "The x vector is not the right size. It is " << x.nrow()
312  << ", it should be " << this->ncol() << std::endl;
313  throw OomphLibError(error_message_stream.str(),
314 OOMPH_CURRENT_FUNCTION,
315  OOMPH_EXCEPTION_LOCATION);
316  }
317  // check that x is not distributed
318  if (x.distributed())
319  {
320  std::ostringstream error_message_stream;
321  error_message_stream
322  << "The x vector cannot be distributed for DenseDoubleMatrix "
323  << "matrix-vector multiply" << std::endl;
324  throw OomphLibError(error_message_stream.str(),
325 OOMPH_CURRENT_FUNCTION,
326  OOMPH_EXCEPTION_LOCATION);
327  }
328  // if soln is setup...
329  if (soln.built())
330  {
331  // check that soln is not distributed
332  if (soln.distributed())
333  {
334  std::ostringstream error_message_stream;
335  error_message_stream
336  << "The x vector cannot be distributed for DenseDoubleMatrix "
337  << "matrix-vector multiply" << std::endl;
338  throw OomphLibError(error_message_stream.str(),
339 OOMPH_CURRENT_FUNCTION,
340  OOMPH_EXCEPTION_LOCATION);
341  }
342  if (soln.nrow() != this->nrow())
343  {
344  std::ostringstream error_message_stream;
345  error_message_stream
346  << "The soln vector is setup and therefore must have the same "
347  << "number of rows as the matrix";
348  throw OomphLibError(error_message_stream.str(),
349 OOMPH_CURRENT_FUNCTION,
350  OOMPH_EXCEPTION_LOCATION);
351  }
352  if (*x.distribution_pt()->communicator_pt() !=
353  *soln.distribution_pt()->communicator_pt())
354  {
355  std::ostringstream error_message_stream;
356  error_message_stream
357  << "The soln vector and the x vector must have the same communicator"
358  << std::endl;
359  throw OomphLibError(error_message_stream.str(),
360 OOMPH_CURRENT_FUNCTION,
361  OOMPH_EXCEPTION_LOCATION);
362  }
363  }
364 #endif
365 
366  // if soln is not setup then setup the distribution
367  if (!soln.built())
368  {
370  this->nrow(),false);
371  soln.build(&dist,0.0);
372  }
373 
374  // Initialise the solution
375  soln.initialise(0.0);
376 
377  // Multiply the matrix A, by the vector x
378  const double* x_pt = x.values_pt();
379  double* soln_pt = soln.values_pt();
380  for (unsigned long i=0;i<N;i++)
381  {
382  for (unsigned long j=0;j<M;j++)
383  {
384  soln_pt[i] += Matrixdata[M*i+j]*x_pt[j];
385  }
386  }
387 }
388 
389 
390 //=================================================================
391 /// Multiply the transposed matrix by the vector x: soln=A^T x
392 //=================================================================
394  DoubleVector &soln) const
395 {
396 #ifdef PARANOID
397  // Check to see if x.size() = ncol().
398  if (x.nrow()!=this->nrow())
399  {
400  std::ostringstream error_message_stream;
401  error_message_stream
402  << "The x vector is not the right size. It is " << x.nrow()
403  << ", it should be " << this->nrow() << std::endl;
404  throw OomphLibError(error_message_stream.str(),
405 OOMPH_CURRENT_FUNCTION,
406  OOMPH_EXCEPTION_LOCATION);
407  }
408  // check that x is not distributed
409  if (x.distributed())
410  {
411  std::ostringstream error_message_stream;
412  error_message_stream
413  << "The x vector cannot be distributed for DenseDoubleMatrix "
414  << "matrix-vector multiply" << std::endl;
415  throw OomphLibError(error_message_stream.str(),
416 OOMPH_CURRENT_FUNCTION,
417  OOMPH_EXCEPTION_LOCATION);
418  }
419  // if soln is setup...
420  if (soln.built())
421  {
422  // check that soln is not distributed
423  if (soln.distributed())
424  {
425  std::ostringstream error_message_stream;
426  error_message_stream
427  << "The x vector cannot be distributed for DenseDoubleMatrix "
428  << "matrix-vector multiply" << std::endl;
429  throw OomphLibError(error_message_stream.str(),
430 OOMPH_CURRENT_FUNCTION,
431  OOMPH_EXCEPTION_LOCATION);
432  }
433  if (soln.nrow() != this->ncol())
434  {
435  std::ostringstream error_message_stream;
436  error_message_stream
437  << "The soln vector is setup and therefore must have the same "
438  << "number of columns as the matrix";
439  throw OomphLibError(error_message_stream.str(),
440 OOMPH_CURRENT_FUNCTION,
441  OOMPH_EXCEPTION_LOCATION);
442  }
443  if (*soln.distribution_pt()->communicator_pt() !=
445  {
446  std::ostringstream error_message_stream;
447  error_message_stream
448  << "The soln vector and the x vector must have the same communicator"
449  << std::endl;
450  throw OomphLibError(error_message_stream.str(),
451 OOMPH_CURRENT_FUNCTION,
452  OOMPH_EXCEPTION_LOCATION);
453  }
454  }
455 #endif
456 
457  // if soln is not setup then setup the distribution
458  if (!soln.built())
459  {
460  LinearAlgebraDistribution* dist_pt =
462  this->ncol(),false);
463  soln.build(dist_pt,0.0);
464  delete dist_pt;
465  }
466 
467  // Initialise the solution
468  soln.initialise(0.0);
469 
470  // Matrix vector product
471  double* soln_pt = soln.values_pt();
472  const double* x_pt = x.values_pt();
473  for (unsigned long i=0;i<N;i++)
474  {
475  for (unsigned long j=0;j<M;j++)
476  {
477  soln_pt[j] += Matrixdata[N*i+j]*x_pt[i];
478  }
479  }
480 }
481 
482 
483 
484 //=================================================================
485 /// For every row, find the maximum absolute value of the
486 /// entries in this row. Set all values that are less than alpha times
487 /// this maximum to zero and return the resulting matrix in
488 /// reduced_matrix. Note: Diagonal entries are retained regardless
489 /// of their size.
490 //=================================================================
491 void DenseDoubleMatrix::matrix_reduction(const double &alpha,
492  DenseDoubleMatrix &reduced_matrix)
493 {
494 
495  reduced_matrix.resize(N,M,0.0);
496  // maximum value in a row
497  double max_row;
498 
499  // Loop over rows
500  for(unsigned i=0;i<N;i++)
501  {
502  // Initialise max value in row
503  max_row=0.0;
504 
505  //Loop over entries in columns
506  for(unsigned long j=0;j<M;j++)
507  {
508  // Find max. value in row
509  if(std::fabs( Matrixdata[M*i+j])>max_row)
510  {
511  max_row=std::fabs( Matrixdata[M*i+j]);
512  }
513  }
514 
515  // Decide if we need to retain the entries in the row
516  for(unsigned long j=0;j<M;j++)
517  {
518  // If we're on the diagonal or the value is sufficiently large: retain
519  // i.e. copy across.
520  if(i==j || std::fabs(Matrixdata[M*i+j])>alpha*max_row )
521  {
522  reduced_matrix(i,j) =Matrixdata[M*i+j];
523  }
524  }
525  }
526 
527 }
528 
529 
530 //=============================================================================
531 /// Function to multiply this matrix by the DenseDoubleMatrix matrix_in.
532 //=============================================================================
534  DenseDoubleMatrix& result)
535 {
536 #ifdef PARANOID
537  // check matrix dimensions are compatable
538  if ( this->ncol() != matrix_in.nrow() )
539  {
540  std::ostringstream error_message;
541  error_message
542  << "Matrix dimensions incompatable for matrix-matrix multiplication"
543  << "ncol() for first matrix:" << this->ncol()
544  << "nrow() for second matrix: " << matrix_in.nrow();
545 
546  throw OomphLibError(error_message.str(),
547 OOMPH_CURRENT_FUNCTION,
548  OOMPH_EXCEPTION_LOCATION);
549  }
550 #endif
551 
552  // NB N is number of rows!
553  unsigned long n_row = this->nrow();
554  unsigned long m_col = matrix_in.ncol();
555 
556  // resize and intialize result
557  result.resize(n_row, m_col, 0.0);
558 
559  //clock_t clock1 = clock();
560 
561  // do calculation
562  unsigned long n_col=this->ncol();
563  for (unsigned long k=0; k<n_col; k++)
564  {
565  for (unsigned long i=0; i<n_row; i++)
566  {
567  for (unsigned long j=0; j<m_col; j++)
568  {
569  result(i,j) += Matrixdata[m_col*i+k] * matrix_in(k,j);
570  }
571  }
572  }
573 }
574 
575 
576 
577 ///////////////////////////////////////////////////////////////////////////////
578 ///////////////////////////////////////////////////////////////////////////////
579 ///////////////////////////////////////////////////////////////////////////////
580 
581 
582 //=======================================================================
583 /// \short Default constructor, set the default linear solver and
584 /// matrix-matrix multiplication method.
585 //========================================================================
587  {
590  }
591 
592 //========================================================================
593  /// \short Constructor: Pass vector of values, vector of row indices,
594  /// vector of column starts and number of rows (can be suppressed
595  /// for square matrices). Number of nonzero entries is read
596  /// off from value, so make sure the vector has been shrunk
597  /// to its correct length.
598 //=======================================================================
600  const Vector<int>& row_index,
601  const Vector<int>& column_start,
602  const unsigned long &n,
603  const unsigned long &m) :
604  CCMatrix<double>(value,row_index,column_start,n,m)
605  {
608  }
609 
610  /// Destructor: delete the default linear solver
612 
613 
614 //===================================================================
615 /// Perform LU decomposition. Return the sign of the determinant
616 //===================================================================
618 {
619  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
620 }
621 
622 //===================================================================
623 /// Do the backsubstitution
624 //===================================================================
626 {
627  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->backsub(rhs,rhs);
628 }
629 
630 //===================================================================
631 /// Multiply the matrix by the vector x
632 //===================================================================
634 {
635 #ifdef PARANOID
636  // Check to see if x.size() = ncol().
637  if (x.nrow()!=this->ncol())
638  {
639  std::ostringstream error_message_stream;
640  error_message_stream
641  << "The x vector is not the right size. It is " << x.nrow()
642  << ", it should be " << this->ncol() << std::endl;
643  throw OomphLibError(error_message_stream.str(),
644 OOMPH_CURRENT_FUNCTION,
645  OOMPH_EXCEPTION_LOCATION);
646  }
647  // check that x is not distributed
648  if (x.distributed())
649  {
650  std::ostringstream error_message_stream;
651  error_message_stream
652  << "The x vector cannot be distributed for CCDoubleMatrix "
653  << "matrix-vector multiply" << std::endl;
654  throw OomphLibError(error_message_stream.str(),
655 OOMPH_CURRENT_FUNCTION,
656  OOMPH_EXCEPTION_LOCATION);
657  }
658  // if soln is setup...
659  if (soln.built())
660  {
661  // check that soln is not distributed
662  if (soln.distributed())
663  {
664  std::ostringstream error_message_stream;
665  error_message_stream
666  << "The x vector cannot be distributed for CCDoubleMatrix "
667  << "matrix-vector multiply" << std::endl;
668  throw OomphLibError(error_message_stream.str(),
669 OOMPH_CURRENT_FUNCTION,
670  OOMPH_EXCEPTION_LOCATION);
671  }
672  if (soln.nrow() != this->nrow())
673  {
674  std::ostringstream error_message_stream;
675  error_message_stream
676  << "The soln vector is setup and therefore must have the same "
677  << "number of rows as the matrix";
678  throw OomphLibError(error_message_stream.str(),
679 OOMPH_CURRENT_FUNCTION,
680  OOMPH_EXCEPTION_LOCATION);
681  }
682  if (*soln.distribution_pt()->communicator_pt() !=
684  {
685  std::ostringstream error_message_stream;
686  error_message_stream
687  << "The soln vector and the x vector must have the same communicator"
688  << std::endl;
689  throw OomphLibError(error_message_stream.str(),
690 OOMPH_CURRENT_FUNCTION,
691  OOMPH_EXCEPTION_LOCATION);
692  }
693  }
694 #endif
695 
696  // if soln is not setup then setup the distribution
697  if (!soln.built())
698  {
699  LinearAlgebraDistribution* dist_pt =
701  this->nrow(),false);
702  soln.build(dist_pt,0.0);
703  delete dist_pt;
704  }
705 
706  // zero
707  soln.initialise(0.0);
708 
709  // multiply
710  double* soln_pt = soln.values_pt();
711  const double* x_pt = x.values_pt();
712  for (unsigned long j=0;j<N;j++)
713  {
714  for (long k=Column_start[j];k<Column_start[j+1];k++)
715  {
716  unsigned long i = Row_index[k];
717  double a_ij = Value[k];
718  soln_pt[i] += a_ij*x_pt[j];
719  }
720  }
721 }
722 
723 
724 
725 
726 //=================================================================
727 /// Multiply the transposed matrix by the vector x: soln=A^T x
728 //=================================================================
730  DoubleVector &soln) const
731 {
732 #ifdef PARANOID
733  // Check to see if x.size() = ncol().
734  if (x.nrow()!=this->nrow())
735  {
736  std::ostringstream error_message_stream;
737  error_message_stream
738  << "The x vector is not the right size. It is " << x.nrow()
739  << ", it should be " << this->nrow() << std::endl;
740  throw OomphLibError(error_message_stream.str(),
741 OOMPH_CURRENT_FUNCTION,
742  OOMPH_EXCEPTION_LOCATION);
743  }
744  // check that x is not distributed
745  if (x.distributed())
746  {
747  std::ostringstream error_message_stream;
748  error_message_stream
749  << "The x vector cannot be distributed for CCDoubleMatrix "
750  << "matrix-vector multiply" << std::endl;
751  throw OomphLibError(error_message_stream.str(),
752 OOMPH_CURRENT_FUNCTION,
753  OOMPH_EXCEPTION_LOCATION);
754  }
755  // if soln is setup...
756  if (soln.built())
757  {
758  // check that soln is not distributed
759  if (soln.distributed())
760  {
761  std::ostringstream error_message_stream;
762  error_message_stream
763  << "The x vector cannot be distributed for CCDoubleMatrix "
764  << "matrix-vector multiply" << std::endl;
765  throw OomphLibError(error_message_stream.str(),
766 OOMPH_CURRENT_FUNCTION,
767  OOMPH_EXCEPTION_LOCATION);
768  }
769  if (soln.nrow() != this->ncol())
770  {
771  std::ostringstream error_message_stream;
772  error_message_stream
773  << "The soln vector is setup and therefore must have the same "
774  << "number of columns as the matrix";
775  throw OomphLibError(error_message_stream.str(),
776 OOMPH_CURRENT_FUNCTION,
777  OOMPH_EXCEPTION_LOCATION);
778  }
779  if (*soln.distribution_pt()->communicator_pt() !=
781  {
782  std::ostringstream error_message_stream;
783  error_message_stream
784  << "The soln vector and the x vector must have the same communicator"
785  << std::endl;
786  throw OomphLibError(error_message_stream.str(),
787 OOMPH_CURRENT_FUNCTION,
788  OOMPH_EXCEPTION_LOCATION);
789  }
790  }
791 #endif
792 
793  // if soln is not setup then setup the distribution
794  if (!soln.built())
795  {
796  LinearAlgebraDistribution* dist_pt =
798  this->ncol(),false);
799  soln.build(dist_pt,0.0);
800  delete dist_pt;
801  }
802 
803  // zero
804  soln.initialise(0.0);
805 
806  // Matrix vector product
807  double* soln_pt = soln.values_pt();
808  const double* x_pt = x.values_pt();
809  for (unsigned long i=0;i<N;i++)
810  {
811 
812  for (long k=Column_start[i];k<Column_start[i+1];k++)
813  {
814  unsigned long j=Row_index[k];
815  double a_ij=Value[k];
816  soln_pt[j]+=a_ij*x_pt[i];
817  }
818  }
819 }
820 
821 
822 
823 
824 //===========================================================================
825 /// Function to multiply this matrix by the CCDoubleMatrix matrix_in
826 /// The multiplication method used can be selected using the flag
827 /// Matrix_matrix_multiply_method. By default Method 2 is used.
828 /// Method 1: First runs through this matrix and matrix_in to find the storage
829 /// requirements for result - arrays of the correct size are
830 /// then allocated before performing the calculation.
831 /// Minimises memory requirements but more costly.
832 /// Method 2: Grows storage for values and column indices of result 'on the
833 /// fly' using an array of maps. Faster but more memory
834 /// intensive.
835 /// Method 3: Grows storage for values and column indices of result 'on the
836 /// fly' using a vector of vectors. Not particularly impressive
837 /// on the platforms we tried...
838 //=============================================================================
840  CCDoubleMatrix& result)
841 {
842 #ifdef PARANOID
843  // check matrix dimensions are compatible
844  if ( this->ncol() != matrix_in.nrow() )
845  {
846  std::ostringstream error_message;
847  error_message
848  << "Matrix dimensions incompatable for matrix-matrix multiplication"
849  << "ncol() for first matrix:" << this->ncol()
850  << "nrow() for second matrix: " << matrix_in.nrow();
851 
852  throw OomphLibError(error_message.str(),
853 OOMPH_CURRENT_FUNCTION,
854  OOMPH_EXCEPTION_LOCATION);
855  }
856 #endif
857 
858  // NB N is number of rows!
859  unsigned long N = this->nrow();
860  unsigned long M = matrix_in.ncol();
861  unsigned long Nnz = 0;
862 
863  // pointers to arrays which store result
864  int* Column_start;
865  double* Value;
866  int* Row_index;
867 
868  // get pointers to matrix_in
869  const int* matrix_in_col_start = matrix_in.column_start();
870  const int* matrix_in_row_index = matrix_in.row_index();
871  const double* matrix_in_value = matrix_in.value();
872 
873  // get pointers to this matrix
874  const double* this_value = this->value();
875  const int* this_col_start = this->column_start();
876  const int* this_row_index = this->row_index();
877 
878  // set method
879  unsigned method = Matrix_matrix_multiply_method;
880 
881  // clock_t clock1 = clock();
882 
883  // METHOD 1
884  // --------
885  if (method==1)
886  {
887  // allocate storage for column starts
888  Column_start = new int[M+1];
889  Column_start[0]=0;
890 
891  // a set to store number of non-zero rows in each column of result
892  std::set<unsigned> rows;
893 
894  // run through columns of this matrix and matrix_in to find number of
895  // non-zero entries in each column of result
896  for (unsigned long this_col = 0; this_col<M; this_col++)
897  {
898  // run through non-zeros in this_col of this matrix
899  for (int this_ptr = this_col_start[this_col];
900  this_ptr < this_col_start[this_col+1];
901  this_ptr++)
902  {
903  // find row index for non-zero
904  unsigned matrix_in_col = this_row_index[this_ptr];
905 
906  // run through corresponding column in matrix_in
907  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
908  matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
909  matrix_in_ptr++)
910  {
911  // find row index for non-zero in matrix_in and store in rows
912  rows.insert(matrix_in_row_index[matrix_in_ptr]);
913  }
914  }
915  // update Column_start
916  Column_start[this_col+1] = Column_start[this_col] + rows.size();
917 
918  // wipe values in rows
919  rows.clear();
920  }
921 
922  // set Nnz
923  Nnz = Column_start[M];
924 
925  // allocate arrays for result
926  Value = new double[Nnz];
927  Row_index = new int[Nnz];
928 
929  // set all values of Row_index to -1
930  for (unsigned long i=0;i<Nnz;i++)
931  Row_index[i] = -1;
932 
933  // Calculate values for result - first run through columns of this matrix
934  for (unsigned long this_col = 0; this_col<M; this_col++)
935  {
936  // run through non-zeros in this_column
937  for (int this_ptr = this_col_start[this_col];
938  this_ptr < this_col_start[this_col+1];
939  this_ptr++)
940  {
941  // find value of non-zero
942  double this_val = this_value[this_ptr];
943 
944  // find row associated with non-zero
945  unsigned matrix_in_col = this_row_index[this_ptr];
946 
947  // run through corresponding column in matrix_in
948  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
949  matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
950  matrix_in_ptr++)
951  {
952  // find row index for non-zero in matrix_in
953  int row = matrix_in_row_index[matrix_in_ptr];
954 
955  // find position in result to insert value
956  for(int ptr = Column_start[this_col];
957  ptr <= Column_start[this_col+1];
958  ptr++)
959  {
960  if (ptr == Column_start[this_col+1])
961  {
962  // error - have passed end of column without finding
963  // correct row index
964  std::ostringstream error_message;
965  error_message << "Error inserting value in result";
966 
967  throw OomphLibError(error_message.str(),
968  OOMPH_CURRENT_FUNCTION,
969  OOMPH_EXCEPTION_LOCATION);
970  }
971  else if (Row_index[ptr] == -1 )
972  {
973  // first entry for this row index
974  Row_index[ptr] = row;
975  Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
976  break;
977  }
978  else if ( Row_index[ptr] == row )
979  {
980  // row index already exists - add value
981  Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
982  break;
983  }
984  }
985  }
986  }
987  }
988  }
989 
990  // METHOD 2
991  // --------
992  else if (method==2)
993  {
994  // generate array of maps to store values for result
995  std::map<int,double>* result_maps = new std::map<int,double>[M];
996 
997  // run through columns of this matrix
998  for (unsigned long this_col = 0; this_col<M; this_col++)
999  {
1000  // run through non-zeros in this_col
1001  for (int this_ptr = this_col_start[this_col];
1002  this_ptr < this_col_start[this_col+1];
1003  this_ptr++)
1004  {
1005  // find value of non-zero
1006  double this_val = this_value[this_ptr];
1007 
1008  // find row index associated with non-zero
1009  unsigned matrix_in_col = this_row_index[this_ptr];
1010 
1011  // run through corresponding column in matrix_in
1012  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
1013  matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
1014  matrix_in_ptr++)
1015  {
1016  // find row index for non-zero in matrix_in
1017  int row = matrix_in_row_index[matrix_in_ptr];
1018 
1019  // insert value
1020  result_maps[this_col][row] +=
1021  this_val * matrix_in_value[matrix_in_ptr];
1022  }
1023  }
1024  }
1025 
1026  // allocate Column_start
1027  Column_start = new int[M+1];
1028 
1029  // copy across column starts
1030  Column_start[0] = 0;
1031  for (unsigned long col=0; col<M; col++)
1032  {
1033  int size = result_maps[col].size();
1034  Column_start[col+1] = Column_start[col] + size;
1035  }
1036 
1037  // set Nnz
1038  Nnz = Column_start[M];
1039 
1040  // allocate other arrays
1041  Value = new double[Nnz];
1042  Row_index = new int[Nnz];
1043 
1044  // copy values and row indices
1045  for (unsigned long col=0; col<M; col++)
1046  {
1047  unsigned ptr = Column_start[col];
1048  for (std::map<int,double>::iterator i = result_maps[col].begin();
1049  i != result_maps[col].end();
1050  i ++)
1051  {
1052  Row_index[ptr]= i->first;
1053  Value[ptr] = i->second;
1054  ptr++;
1055  }
1056  }
1057 
1058  // tidy up memory
1059  delete[] result_maps;
1060  }
1061 
1062  // METHOD 3
1063  // --------
1064  else if (method==3)
1065  {
1066  // vectors of vectors to store results
1067  std::vector< std::vector<int> > result_rows(N);
1068  std::vector< std::vector<double> > result_vals(N);
1069 
1070  // run through the columns of this matrix
1071  for (unsigned long this_col = 0; this_col<M; this_col++)
1072  {
1073  // run through non-zeros in this_col
1074  for (int this_ptr = this_col_start[this_col];
1075  this_ptr < this_col_start[this_col+1];
1076  this_ptr++)
1077  {
1078  // find value of non-zero
1079  double this_val = this_value[this_ptr];
1080 
1081  // find row index associated with non-zero
1082  unsigned matrix_in_col = this_row_index[this_ptr];
1083 
1084  // run through corresponding column in matrix_in
1085  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
1086  matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
1087  matrix_in_ptr++)
1088  {
1089  // find row index for non-zero in matrix_in
1090  int row = matrix_in_row_index[matrix_in_ptr];
1091 
1092  // insert value
1093  int size = result_rows[this_col].size();
1094  for (int i = 0; i<=size; i++)
1095  {
1096  if (i==size)
1097  {
1098  // first entry for this row index
1099  result_rows[this_col].push_back(row);
1100  result_vals[this_col].push_back(
1101  this_val*matrix_in_value[matrix_in_ptr]);
1102  }
1103  else if (row==result_rows[this_col][i])
1104  {
1105  // row index already exists
1106  result_vals[this_col][i] +=
1107  this_val * matrix_in_value[matrix_in_ptr];
1108  break;
1109  }
1110  }
1111  }
1112  }
1113  }
1114 
1115  // allocate Column_start
1116  Column_start = new int[M+1];
1117 
1118  // copy across column starts
1119  Column_start[0] = 0;
1120  for (unsigned long col=0; col<M; col++)
1121  {
1122  int size = result_rows[col].size();
1123  Column_start[col+1] = Column_start[col] + size;
1124  }
1125 
1126  // set Nnz
1127  Nnz = Column_start[M];
1128 
1129  // allocate other arrays
1130  Value = new double[Nnz];
1131  Row_index = new int[Nnz];
1132 
1133  // copy across values and row indices
1134  for (unsigned long col=0; col<N; col++)
1135  {
1136  unsigned ptr = Column_start[col];
1137  unsigned n_rows=result_rows[col].size();
1138  for (unsigned i = 0; i < n_rows ; i++)
1139  {
1140  Row_index[ptr] = result_rows[col][i];
1141  Value[ptr] = result_vals[col][i];
1142  ptr++;
1143  }
1144  }
1145  }
1146 
1147  // INCORRECT VALUE FOR METHOD
1148  else
1149  {
1150  std::ostringstream error_message;
1151  error_message << "Incorrect method set in matrix-matrix multiply"
1152  << "method=" << method << " not allowed";
1153 
1154  throw OomphLibError(error_message.str(),
1155 OOMPH_CURRENT_FUNCTION,
1156  OOMPH_EXCEPTION_LOCATION);
1157  }
1158 
1159  result.build_without_copy(Value, Row_index, Column_start, Nnz, N, M);
1160 }
1161 
1162 
1163 
1164 //=================================================================
1165 /// For every row, find the maximum absolute value of the
1166 /// entries in this row. Set all values that are less than alpha times
1167 /// this maximum to zero and return the resulting matrix in
1168 /// reduced_matrix. Note: Diagonal entries are retained regardless
1169 /// of their size.
1170 //=================================================================
1171 void CCDoubleMatrix::matrix_reduction(const double &alpha,
1172  CCDoubleMatrix &reduced_matrix)
1173 {
1174  // number of columns in matrix
1175  long n_coln=ncol();
1176 
1177  Vector<double>max_row(nrow(),0.0);
1178 
1179  // Here's the packed format for the new matrix
1180  Vector<int> B_row_start(1);
1181  Vector<int> B_column_index;
1182  Vector<double> B_value;
1183 
1184 
1185  // k is counter for the number of entries in the reduced matrix
1186  unsigned k=0;
1187 
1188  // Initialise row start
1189  B_row_start[0]=0;
1190 
1191  // Loop over columns
1192  for(long i=0;i<n_coln;i++)
1193  {
1194 
1195  //Loop over entries in columns
1196  for(long j=Column_start[i];j<Column_start[i+1];j++)
1197  {
1198 
1199  // Find max. value in row
1200  if(std::fabs(Value[j])>max_row[Row_index[j]])
1201  {
1202  max_row[Row_index[j]]=std::fabs(Value[j]);
1203  }
1204  }
1205 
1206  // Decide if we need to retain the entries in the row
1207  for(long j=Column_start[i];j<Column_start[i+1];j++)
1208  {
1209  // If we're on the diagonal or the value is sufficiently large: retain
1210  // i.e. copy across.
1211  if(i==Row_index[j] || std::fabs(Value[j])>alpha*max_row[Row_index[j]] )
1212  {
1213  B_value.push_back(Value[j]);
1214  B_column_index.push_back(Row_index[j]);
1215  k++;
1216  }
1217  }
1218  // This writes the row start for the next row -- equal to
1219  // to the number of entries written so far (C++ zero-based indexing!)
1220  B_row_start.push_back(k);
1221  }
1222 
1223 
1224  // Build the matrix from the compressed format
1225  dynamic_cast<CCDoubleMatrix&>(reduced_matrix).
1226  build(B_value,B_column_index,B_row_start,nrow(),ncol());
1227  }
1228 
1229 
1230 
1231 ///////////////////////////////////////////////////////////////////////////////
1232 ///////////////////////////////////////////////////////////////////////////////
1233 ///////////////////////////////////////////////////////////////////////////////
1234 
1235 //=============================================================================
1236 /// Default constructor
1237 //=============================================================================
1239  {
1240  // set the default solver
1242 
1243  // matrix not built
1244  Built = false;
1245 
1246  // set the serial matrix-matrix multiply method
1247 #ifdef OOMPH_HAS_TRILINOS
1248 // Serial_matrix_matrix_multiply_method = 4;
1249  Serial_matrix_matrix_multiply_method = 2;
1250 #else
1251  Serial_matrix_matrix_multiply_method = 2;
1252 #endif
1253  }
1254 
1255 //=============================================================================
1256 /// Copy constructor
1257 //=============================================================================
1259 {
1260  // copy the distribution
1261  this->build_distribution(other_matrix.distribution_pt());
1262 
1263  // copy coefficients
1264  const double* values_pt = other_matrix.value();
1265  const int* column_indices = other_matrix.column_index();
1266  const int* row_start = other_matrix.row_start();
1267 
1268  // This is the local nnz.
1269  const unsigned nnz = other_matrix.nnz();
1270 
1271  // Using number of local rows since the underlying CRMatrix is local to
1272  // each processor.
1273  const unsigned nrow_local = other_matrix.nrow_local();
1274 
1275  // Storage for the (yet to be copied) data.
1276  double* my_values_pt = new double[nnz];
1277  int* my_column_indices = new int[nnz];
1278  int* my_row_start = new int[nrow_local+1];
1279 
1280  // Copying over the data.
1281  std::copy(values_pt,values_pt+nnz,my_values_pt);
1282  std::copy(column_indices,column_indices+nnz,my_column_indices);
1283  std::copy(row_start,row_start+nrow_local+1,my_row_start);
1284 
1285 
1286  // Build without copy since we have made a deep copy of the data structure.
1287  this->build_without_copy(other_matrix.ncol(),nnz,my_values_pt,
1288  my_column_indices,my_row_start);
1289 
1290  // set the default solver
1292 
1293  // matrix is built
1294  Built = true;
1295 
1296  // set the serial matrix-matrix multiply method
1297 #ifdef OOMPH_HAS_TRILINOS
1298 // Serial_matrix_matrix_multiply_method = 4;
1299  Serial_matrix_matrix_multiply_method = 2;
1300 #else
1301  Serial_matrix_matrix_multiply_method = 2;
1302 #endif
1303 }
1304 
1305 
1306 //=============================================================================
1307 /// Constructor: just stores the distribution but does not build the
1308 /// matrix
1309 //=============================================================================
1311  distribution_pt)
1312  {
1313  this->build_distribution(distribution_pt);
1314 
1315  // set the default solver
1317 
1318  // matrix not built
1319  Built = false;
1320 
1321 // set the serial matrix-matrix multiply method
1322 #ifdef OOMPH_HAS_TRILINOS
1323 // Serial_matrix_matrix_multiply_method = 4;
1324  Serial_matrix_matrix_multiply_method = 2;
1325 #else
1326  Serial_matrix_matrix_multiply_method = 2;
1327 #endif
1328 
1329  }
1330 
1331 //=============================================================================
1332 /// \short Constructor: Takes the distribution and the number of columns, as
1333 /// well as the vector of values, vector of column indices,vector of row
1334 ///starts.
1335 //=============================================================================
1337  const unsigned& ncol,
1338  const Vector<double>& value,
1339  const Vector<int>& column_index,
1340  const Vector<int>& row_start)
1341 {
1342  // build the compressed row matrix
1343  CR_matrix.build(value,column_index,row_start,dist_pt->nrow_local(),ncol);
1344 
1345  // store the Distribution
1346  this->build_distribution(dist_pt);
1347 
1348  // set the linear solver
1350 
1351  // set the serial matrix-matrix multiply method
1352 #ifdef OOMPH_HAS_TRILINOS
1353 // Serial_matrix_matrix_multiply_method = 4;
1354  Serial_matrix_matrix_multiply_method = 2;
1355 #else
1356  Serial_matrix_matrix_multiply_method = 2;
1357 #endif
1358 
1359  // matrix has been built
1360  Built = true;
1361 }
1362 
1363 
1364 //=============================================================================
1365 /// Destructor
1366 //=============================================================================
1368 {
1369  this->clear();
1370  delete Default_linear_solver_pt;
1372 }
1373 
1374 //=============================================================================
1375 /// Rebuild the matrix - assembles an empty matrix with a defined distribution
1376 //=============================================================================
1378 {
1379  this->clear();
1380  this->build_distribution(distribution_pt);
1381 }
1382 
1383 //=============================================================================
1384 /// \short Runs through the column index vector and checks if the entries
1385 /// are arranged arbitrarily or if they follow the regular lexicographical
1386 /// of matrices. If a boolean argument is provided with the assignment
1387 /// TRUE then information on the first entry which is not in the correct
1388 /// position will also be given
1389 //=============================================================================
1390  bool CRDoubleMatrix::entries_are_sorted(const bool& doc_unordered_entries) const
1391  {
1392 #ifdef OOMPH_HAS_MPI
1393  // We only need to produce a warning if the matrix is distributed
1394  if (this->distributed())
1395  {
1396  // Create an ostringstream object to store the warning message
1397  std::ostringstream warning_message;
1398 
1399  // Create the warning messsage
1400  warning_message << "This method currently works for serial but "
1401  << "has not been implemented for use with MPI!\n";
1402 
1403  // Issue the warning
1404  OomphLibWarning(warning_message.str(),
1405  OOMPH_CURRENT_FUNCTION,
1406  OOMPH_EXCEPTION_LOCATION);
1407  }
1408 #endif
1409 
1410  // Get the number of rows in this matrix
1411  unsigned n_rows=this->nrow();
1412 
1413  // Acquire access to the value, row_start and column_index arrays from
1414  // the CR matrix. Since we do not change anything in row_start_pt we
1415  // give it the const prefix
1416  const int* column_index_pt=this->column_index();
1417  const int* row_start_pt=this->row_start();
1418 
1419  // Loop over the rows of matrix
1420  for (unsigned i=0;i<n_rows;i++)
1421  {
1422  // Calculate the number of nonzeros in the i-th row
1423  unsigned nnz_row_i=*(row_start_pt+i+1)-*(row_start_pt+i);
1424 
1425  // Get the index of the first entry in row i
1426  unsigned i_row_start=*(row_start_pt+i);
1427 
1428  // Loop over the entries of the i-th row
1429  for (unsigned j=0;j<nnz_row_i-1;j++)
1430  {
1431  // Check if the column index of the following entry is greater than the
1432  // current entry
1433  if ((*(column_index_pt+i_row_start+j+1))<
1434  (*(column_index_pt+i_row_start+j)))
1435  {
1436  // If the input argument was set to TRUE we document we output information
1437  // of the first entry which is not in the correct position
1438  if (doc_unordered_entries)
1439  {
1440  // Tell the user
1441  oomph_info << "Matrix has not been correctly sorted!"
1442  << "\nOn row: " << i
1443  << "\nEntry: " << j
1444  << "\nEntry 1 = " << *(column_index_pt+i_row_start+j)
1445  << "\nEntry 2 = " << *(column_index_pt+i_row_start+j+1)
1446  << std::endl;
1447  }
1448 
1449  // It hasn't worked
1450  return false;
1451  } // if ((*(column_index_pt+i_row_start+j+1)) ...
1452  } // for (unsigned j=0;j<nnz_row_i-1;j++)
1453  } // for (unsigned i=0;i<n_rows;i++)
1454 
1455  // If it gets here without a warning then the entries in each row of
1456  // the matrix are ordered by increasing column index
1457  return true;
1458  } // End of entries_are_sorted()
1459 
1460 //=============================================================================
1461 /// \short This helper function sorts the entries in the column index vector
1462 /// and the value vector. During the construction of the matrix the entries
1463 /// were most likely assigned in an arbitrary order. As a result, it cannot
1464 /// be assumed that the entries in the column index vector corresponding to
1465 /// each row of the matrix have been arranged in increasing order. During
1466 /// the setup an additional vector will be set up; Index_of_diagonal_entries.
1467 /// The i-th entry of this vector contains the index of the last entry
1468 /// below or on the diagonal. If there are no entries below or on the
1469 /// diagonal then the corresponding entry is -1. If, however, there are
1470 /// no entries in the row then the entry is irrelevant and is kept
1471 /// as the initialised value; 0.
1472 //=============================================================================
1474  {
1475 #ifdef OOMPH_HAS_MPI
1476  // We only need to produce a warning if the matrix is distributed
1477  if (this->distributed())
1478  {
1479  // Create an ostringstream object to store the warning message
1480  std::ostringstream warning_message;
1481 
1482  // Create the warning messsage
1483  warning_message << "This method currently works for serial but "
1484  << "has not been tested with MPI!\n";
1485 
1486  // Issue the warning
1487  OomphLibWarning(warning_message.str(),
1488  OOMPH_CURRENT_FUNCTION,
1489  OOMPH_EXCEPTION_LOCATION);
1490  }
1491 #endif
1492 
1493  // Get the number of rows in the matrix
1494  unsigned n_rows=this->nrow();
1495 
1496  // Acquire access to the value, row_start and column_index arrays from
1497  // the CR matrix. Since we do not change anything in row_start_pt we
1498  // give it the const prefix
1499  double* value_pt=this->value();
1500  int* column_index_pt=this->column_index();
1501  const int* row_start_pt=this->row_start();
1502 
1503  // Resize the Index_of_diagonal_entries vector
1504  Index_of_diagonal_entries.resize(n_rows,0);
1505 
1506  // Vector of pairs to store the column_index of each value in the i-th row
1507  // and its corresponding matrix entry
1508  Vector<std::pair<int,double> > column_index_and_value_row_i;
1509 
1510  // Loop over the rows of the matrix
1511  for (unsigned i=0;i<n_rows;i++)
1512  {
1513  // Find the number of nonzeros in the i-th row
1514  unsigned nnz_row_i=*(row_start_pt+i+1)-*(row_start_pt+i);
1515 
1516  // Variable to store the start of the i-th row
1517  unsigned i_row_start=*(row_start_pt+i);
1518 
1519  // If there are no nonzeros in this row then the i-th entry of the vector
1520  // Index_of_diagonal_entries is irrelevant so we can simply let it be 0
1521  if (nnz_row_i==0)
1522  {
1523  // Set the i-th entry
1524  Index_of_diagonal_entries[i]=0;
1525  }
1526  // If there are nonzeros in the i-th row
1527  else
1528  {
1529  // If there is more than one entry in the row resize the vector
1530  // column_index_and_value_row_i
1531  column_index_and_value_row_i.resize(nnz_row_i);
1532 
1533  // Loop over the entries in the row
1534  for (unsigned j=0;j<nnz_row_i;j++)
1535  {
1536  // Assign the appropriate entries to column_index_and_value_row_i
1537  column_index_and_value_row_i[j]=
1538  std::make_pair(*(column_index_pt+i_row_start+j),
1539  *(value_pt+i_row_start+j));
1540  }
1541 
1542  // Sort the vector of pairs using the struct CRDoubleMatrixComparisonHelper
1543  std::sort(column_index_and_value_row_i.begin(),
1544  column_index_and_value_row_i.end(),
1545  Comparison_struct);
1546 
1547  //-----------------------------------------------------------------------
1548  // Now that the entries of the i-th row have been sorted we can read them
1549  // back into value_pt and column_index_pt:
1550  //-----------------------------------------------------------------------
1551 
1552  // Create a boolean variable to indicate whether or not the i-th entry of
1553  // Index_of_diagonal_entries has been set
1554  bool is_ith_entry_set=false;
1555 
1556  // Loop over the entries in the vector column_index_and_value_row_i and
1557  // assign its entries to value_pt and column_index_pt
1558  for (unsigned j=0;j<nnz_row_i;j++)
1559  {
1560  // Set the column index of the j-th nonzero value in the i-th row of
1561  // the matrix
1562  *(column_index_pt+i_row_start+j)=column_index_and_value_row_i[j].first;
1563 
1564  // Set the value of the j-th nonzero value in the i-th row of
1565  // the matrix
1566  *(value_pt+i_row_start+j)=column_index_and_value_row_i[j].second;
1567 
1568  // This if statement is used to set the i-th entry of the vector
1569  // Index_of_diagonal_entries if it has not yet been set
1570  if (!is_ith_entry_set)
1571  {
1572  // If the column index of the first entry in row i is greater than the
1573  // row number then the first entry must lie above the diagonal
1574  if (unsigned(*(column_index_pt+i_row_start))>i)
1575  {
1576  // If the column index of the first entry in the row is greater than
1577  // the row number, i, then the i-th entry of Index_of_diagonal_entries
1578  // needs to be set to -1 to indicate there are no entries below or on
1579  // the diagonal
1580  Index_of_diagonal_entries[i]=-1;
1581 
1582  // Indicate that the i-th entry of Index_of_diagonal_entries has
1583  // been set
1584  is_ith_entry_set=true;
1585  }
1586  // If there are entries below or on the diagonal
1587  else
1588  {
1589  // If there is only one entry in the row then we know that this will
1590  // be the last entry below or on the diagonal because we have
1591  // eliminated the possibility that if there is only one entry,
1592  // that it lies above the diagonal
1593  if (nnz_row_i==1)
1594  {
1595  // Set the index of the current entry to be the value of i-th entry
1596  // of Index_of_diagonal_entries
1597  Index_of_diagonal_entries[i]=i_row_start+j;
1598 
1599  // Indicate that the i-th entry of Index_of_diagonal_entries has
1600  // been set
1601  is_ith_entry_set=true;
1602  }
1603  // It remains to now check the case that there is more than one
1604  // entry in the row. If there is more than one entry in the row
1605  // and there are entries below or on the diagonal then we have
1606  // three cases:
1607  // (1) The current entry lies on the diagonal;
1608  // (2) The current entry lies above the diagonal;
1609  // (3) The current entry lies below the diagonal;
1610  // The first case can easily be checked as done below. If the second
1611  // case occurs then we have just passed the last entry. We know this
1612  // because at least one entry lies on or below the diagonal. If the
1613  // second case it true then we need to assign the previous entry to
1614  // the vector Index_of_diagonal_entries. Finally, we are left with
1615  // case (3), which can be split into two cases:
1616  // (3.1) The current entry lies below the diagonal but it
1617  // is not the last entry below or on the diagonal;
1618  // (3.2) The current entry lies below the diagonal and is
1619  // the last entry below or on the diagonal.
1620  // If case (3.1) holds then we can simply wait until we get to the
1621  // next entry in the row and examine that. If the next entry lies
1622  // on the diagonal then it will be swept up by case (1). If the
1623  // next entry lies above the diagonal then case (2) will sweep it up
1624  // and if neither is the case then we wait until the next entry and
1625  // so on. If, instead, case (3.2) holds then our last check simply
1626  // needs to check if the current entry is the last entry in the row
1627  // because if the last entry lies on the diagonal, case (1) will
1628  // sweep it up. If it lies above the diagonal, case (2) will take
1629  // care of it. Therefore, the only remaining case is that it lies
1630  // strictly below the diagonal and since it is the last entry in
1631  // the row it means the index of this entry needs to be assigned
1632  // to Index_of_diagonal_entries
1633 
1634  // Case (1) : The current entry lies on the diagonal
1635  else if (unsigned(*(column_index_pt+i_row_start+j))==i)
1636  {
1637  // Set the index of the current entry to be the value of i-th entry
1638  // of Index_of_diagonal_entries
1639  Index_of_diagonal_entries[i]=i_row_start+j;
1640 
1641  // Indicate that the i-th entry of Index_of_diagonal_entries has
1642  // been set
1643  is_ith_entry_set=true;
1644  }
1645  // Case (2) : The current entry lies above the diagonal
1646  else if (unsigned(*(column_index_pt+i_row_start+j))>i)
1647  {
1648  // Set the index of the current entry to be the value of i-th entry
1649  // of Index_of_diagonal_entries
1650  Index_of_diagonal_entries[i]=i_row_start+j-1;
1651 
1652  // Indicate that the i-th entry of Index_of_diagonal_entries has
1653  // been set
1654  is_ith_entry_set=true;
1655  }
1656  // Case (3.2) : The current entry is the last entry in the row
1657  else if (j==nnz_row_i-1)
1658  {
1659  // Set the index of the current entry to be the value of i-th entry
1660  // of Index_of_diagonal_entries
1661  Index_of_diagonal_entries[i]=i_row_start+j;
1662 
1663  // Indicate that the i-th entry of Index_of_diagonal_entries has
1664  // been set
1665  is_ith_entry_set=true;
1666  } // if (nnz_row_i==1) else if
1667  } // if (*(column_index_pt+i_row_start)>i)
1668  } // if (!is_ith_entry_set)
1669  } // for (unsigned j=0;j<nnz_row_i;j++)
1670  } // if (nnz_row_i==0) else
1671  } // for (unsigned i=0;i<n_rows;i++)
1672  } // End of sort_entries()
1673 
1674 //=============================================================================
1675 /// Clean method
1676 //=============================================================================
1678 {
1679  this->clear_distribution();
1680  CR_matrix.clean_up_memory();
1681  Built = false;
1682 
1683  if(Linear_solver_pt != 0) // Only clean up if it exists
1685 }
1686 
1687 //=============================================================================
1688 /// \short build method: Takes the distribution and the number of columns, as
1689 /// well as the vector of values, vector of column indices,vector of row
1690 ///starts.
1691 //=============================================================================
1693  const unsigned& ncol,
1694  const Vector<double>& value,
1695  const Vector<int>& column_index,
1696  const Vector<int>& row_start)
1697 {
1698  // clear
1699  this->clear();
1700 
1701  // store the Distribution
1702  this->build_distribution(distribution_pt);
1703 
1704  // set the linear solver
1706 
1707  // now build the matrix
1708  this->build(ncol,value,column_index,row_start);
1709 }
1710 
1711 //=============================================================================
1712 /// \short method to rebuild the matrix, but not the distribution
1713 //=============================================================================
1714 void CRDoubleMatrix::build(const unsigned& ncol,
1715  const Vector<double>& value,
1716  const Vector<int>& column_index,
1717  const Vector<int>& row_start)
1718 {
1719  // call the underlying build method
1720  CR_matrix.clean_up_memory();
1721  CR_matrix.build(value,column_index,row_start,
1722  this->nrow_local(),ncol);
1723 
1724  // matrix has been build
1725  Built = true;
1726 }
1727 
1728 //=============================================================================
1729 /// \short method to rebuild the matrix, but not the distribution
1730 //=============================================================================
1732  const unsigned& nnz,
1733  double* value,
1734  int* column_index,
1735  int* row_start)
1736 {
1737  // call the underlying build method
1738  CR_matrix.clean_up_memory();
1739  CR_matrix.build_without_copy(value,column_index,row_start,nnz,
1740  this->nrow_local(),ncol);
1741 
1742  // matrix has been build
1743  Built = true;
1744 }
1745 
1746 //=============================================================================
1747 /// Do LU decomposition
1748 //=============================================================================
1750 {
1751 #ifdef PARANOID
1752  // check that the this matrix is built
1753  if (!Built)
1754  {
1755  std::ostringstream error_message_stream;
1756  error_message_stream
1757  << "This matrix has not been built.";
1758  throw OomphLibError(error_message_stream.str(),
1759 OOMPH_CURRENT_FUNCTION,
1760  OOMPH_EXCEPTION_LOCATION);
1761  }
1762 #endif
1763 
1764  // factorise using superlu or superlu dist if we oomph has mpi
1765  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
1766 
1767 }
1768 
1769 //=============================================================================
1770 /// Do back-substitution
1771 //=============================================================================
1773 {
1774 #ifdef PARANOID
1775  // check that the rhs vector is setup
1776  if (!rhs.built())
1777  {
1778  std::ostringstream error_message_stream;
1779  error_message_stream
1780  << "The vector rhs has not been setup";
1781  throw OomphLibError(error_message_stream.str(),
1782 OOMPH_CURRENT_FUNCTION,
1783  OOMPH_EXCEPTION_LOCATION);
1784  }
1785  // check that the rhs vector has the same distribution as this matrix
1786  if (!(*this->distribution_pt() == *rhs.distribution_pt()))
1787  {
1788  std::ostringstream error_message_stream;
1789  error_message_stream
1790  << "The vector rhs must have the same distribution as the matrix";
1791  throw OomphLibError(error_message_stream.str(),
1792 OOMPH_CURRENT_FUNCTION,
1793  OOMPH_EXCEPTION_LOCATION);
1794  }
1795 #endif
1796 
1797  // backsub
1798  DoubleVector rhs_copy(rhs);
1799  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->backsub(rhs_copy,rhs);
1800 }
1801 
1802 //=============================================================================
1803 /// Multiply the matrix by the vector x
1804 //=============================================================================
1806 {
1807 #ifdef PARANOID
1808  // check that this matrix is built
1809  if (!Built)
1810  {
1811  std::ostringstream error_message_stream;
1812  error_message_stream
1813  << "This matrix has not been built";
1814  throw OomphLibError(error_message_stream.str(),
1815 OOMPH_CURRENT_FUNCTION,
1816  OOMPH_EXCEPTION_LOCATION);
1817  }
1818  // check that the distribution of x is setup
1819  if (!x.built())
1820  {
1821  std::ostringstream error_message_stream;
1822  error_message_stream
1823  << "The distribution of the vector x must be setup";
1824  throw OomphLibError(error_message_stream.str(),
1825 OOMPH_CURRENT_FUNCTION,
1826  OOMPH_EXCEPTION_LOCATION);
1827  }
1828  // Check to see if x.size() = ncol().
1829  if (this->ncol() != x.distribution_pt()->nrow())
1830  {
1831  std::ostringstream error_message_stream;
1832  error_message_stream
1833  << "The number of rows in the x vector and the number of columns in the "
1834  << "matrix must be the same";
1835  throw OomphLibError(error_message_stream.str(),
1836 OOMPH_CURRENT_FUNCTION,
1837  OOMPH_EXCEPTION_LOCATION);
1838  }
1839  // if the soln is distributed
1840  if (soln.built())
1841  {
1842  if (!(*soln.distribution_pt() == *this->distribution_pt()))
1843  {
1844  std::ostringstream error_message_stream;
1845  error_message_stream
1846  << "The soln vector is setup and therefore must have the same "
1847  << "distribution as the matrix";
1848  throw OomphLibError(error_message_stream.str(),
1849 OOMPH_CURRENT_FUNCTION,
1850  OOMPH_EXCEPTION_LOCATION);
1851  }
1852  }
1853 #endif
1854 
1855  // if soln is not setup then setup the distribution
1856  if (!soln.built())
1857  {
1858  // Resize and initialize the solution vector
1859  soln.build(this->distribution_pt(),0.0);
1860  }
1861 
1862  // Initialise
1863  soln.initialise(0.0);
1864 
1865  // if distributed and on more than one processor use trilinos
1866  // otherwise use the oomph-lib methods
1867  if (this->distributed() &&
1868  this->distribution_pt()->communicator_pt()->nproc() > 1)
1869  {
1870 #ifdef OOMPH_HAS_TRILINOS
1871  // This will only work if we have trilinos on board
1872  TrilinosEpetraHelpers::multiply(this,x,soln);
1873 #else
1874  std::ostringstream error_message_stream;
1875  error_message_stream
1876  << "Matrix-vector product on multiple processors with distributed "
1877  << "CRDoubleMatrix requires Trilinos.";
1878  throw OomphLibError(error_message_stream.str(),
1879 OOMPH_CURRENT_FUNCTION,
1880  OOMPH_EXCEPTION_LOCATION);
1881 #endif
1882  }
1883  else
1884  {
1885  unsigned n = this->nrow();
1886  const int* row_start = CR_matrix.row_start();
1887  const int* column_index = CR_matrix.column_index();
1888  const double* value = CR_matrix.value();
1889  double* soln_pt = soln.values_pt();
1890  const double* x_pt = x.values_pt();
1891  for (unsigned long i=0;i<n;i++)
1892  {
1893  soln_pt[i] = 0.0;
1894  for (long k=row_start[i];k<row_start[i+1];k++)
1895  {
1896  unsigned long j=column_index[k];
1897  double a_ij=value[k];
1898  soln_pt[i]+=a_ij*x_pt[j];
1899  }
1900  }
1901  }
1902 }
1903 
1904 //=================================================================
1905 /// Multiply the transposed matrix by the vector x: soln=A^T x
1906 //=================================================================
1908  DoubleVector &soln) const
1909 {
1910 #ifdef PARANOID
1911  // check that this matrix is built
1912  if (!Built)
1913  {
1914  std::ostringstream error_message_stream;
1915  error_message_stream
1916  << "This matrix has not been built";
1917  throw OomphLibError(error_message_stream.str(),
1918 OOMPH_CURRENT_FUNCTION,
1919  OOMPH_EXCEPTION_LOCATION);
1920  }
1921  // Check to see if x.size() = ncol().
1922  if (!(*this->distribution_pt() == *x.distribution_pt()))
1923  {
1924  std::ostringstream error_message_stream;
1925  error_message_stream
1926  << "The x vector and this matrix must have the same distribution.";
1927  throw OomphLibError(error_message_stream.str(),
1928 OOMPH_CURRENT_FUNCTION,
1929  OOMPH_EXCEPTION_LOCATION);
1930  }
1931  // if soln is setup then it should have the same distribution as x
1932  if (soln.built())
1933  {
1934  if (soln.distribution_pt()->nrow() != this->ncol())
1935  {
1936  std::ostringstream error_message_stream;
1937  error_message_stream
1938  << "The soln vector is setup and therefore must have the same "
1939  << "number of rows as the vector x";
1940  throw OomphLibError(error_message_stream.str(),
1941 OOMPH_CURRENT_FUNCTION,
1942  OOMPH_EXCEPTION_LOCATION);
1943  }
1944  }
1945 #endif
1946 
1947  // if soln is not setup then setup the distribution
1948  if (!soln.built())
1949  {
1950  LinearAlgebraDistribution* dist_pt =
1952  this->ncol(),this->distributed());
1953  soln.build(dist_pt,0.0);
1954  delete dist_pt;
1955  }
1956 
1957  // Initialise
1958  soln.initialise(0.0);
1959 
1960  if (this->distributed() &&
1961  this->distribution_pt()->communicator_pt()->nproc() > 1)
1962  {
1963 #ifdef OOMPH_HAS_TRILINOS
1964  // This will only work if we have trilinos on board
1965  TrilinosEpetraHelpers::multiply(this,x,soln);
1966 #else
1967  std::ostringstream error_message_stream;
1968  error_message_stream
1969  << "Matrix-vector product on multiple processors with distributed "
1970  << "CRDoubleMatrix requires Trilinos.";
1971  throw OomphLibError(error_message_stream.str(),
1972 OOMPH_CURRENT_FUNCTION,
1973  OOMPH_EXCEPTION_LOCATION);
1974 #endif
1975  }
1976  else
1977  {
1978  unsigned n = this->nrow();
1979  const int* row_start = CR_matrix.row_start();
1980  const int* column_index = CR_matrix.column_index();
1981  const double* value = CR_matrix.value();
1982  double* soln_pt = soln.values_pt();
1983  const double* x_pt = x.values_pt();
1984  // Matrix vector product
1985  for (unsigned long i=0;i<n;i++)
1986  {
1987  for (long k=row_start[i];k<row_start[i+1];k++)
1988  {
1989  unsigned long j=column_index[k];
1990  double a_ij=value[k];
1991  soln_pt[j]+=a_ij*x_pt[i];
1992  }
1993  }
1994  }
1995 }
1996 
1997 //===========================================================================
1998 /// Function to multiply this matrix by the CRDoubleMatrix matrix_in.
1999 /// In a serial matrix, there are 4 methods available:
2000 /// Method 1: First runs through this matrix and matrix_in to find the storage
2001 /// requirements for result - arrays of the correct size are
2002 /// then allocated before performing the calculation.
2003 /// Minimises memory requirements but more costly.
2004 /// Method 2: Grows storage for values and column indices of result 'on the
2005 /// fly' using an array of maps. Faster but more memory
2006 /// intensive.
2007 /// Method 3: Grows storage for values and column indices of result 'on the
2008 /// fly' using a vector of vectors. Not particularly impressive
2009 /// on the platforms we tried...
2010 /// Method 4: Trilinos Epetra Matrix Matrix multiply.
2011 /// Method 5: Trilinox Epetra Matrix Matrix Mulitply (ml based)
2012 /// If Trilinos is installed then Method 4 is employed by default, otherwise
2013 /// Method 2 is employed by default.
2014 /// In a distributed matrix, only Trilinos Epetra Matrix Matrix multiply
2015 /// is available.
2016 //=============================================================================
2018  CRDoubleMatrix& result) const
2019 {
2020 #ifdef PARANOID
2021  // check that this matrix is built
2022  if (!Built)
2023  {
2024  std::ostringstream error_message_stream;
2025  error_message_stream
2026  << "This matrix has not been built";
2027  throw OomphLibError(error_message_stream.str(),
2028  OOMPH_CURRENT_FUNCTION,
2029  OOMPH_EXCEPTION_LOCATION);
2030  }
2031  // check that this matrix is built
2032  if (!matrix_in.built())
2033  {
2034  std::ostringstream error_message_stream;
2035  error_message_stream
2036  << "This matrix matrix_in has not been built";
2037  throw OomphLibError(error_message_stream.str(),
2038  OOMPH_CURRENT_FUNCTION,
2039  OOMPH_EXCEPTION_LOCATION);
2040  }
2041  // if soln is setup then it should have the same distribution as x
2042  if (result.built())
2043  {
2044  if (!(*result.distribution_pt() == *this->distribution_pt()))
2045  {
2046  std::ostringstream error_message_stream;
2047  error_message_stream
2048  << "The matrix result is setup and therefore must have the same "
2049  << "distribution as the vector x";
2050  throw OomphLibError(error_message_stream.str(),
2051  OOMPH_CURRENT_FUNCTION,
2052  OOMPH_EXCEPTION_LOCATION);
2053  }
2054  }
2055 #endif
2056 
2057  // if the result has not been setup, then store the distribution
2058  if (!result.distribution_built())
2059  {
2060  result.build(this->distribution_pt());
2061  }
2062 
2063  // short name for Serial_matrix_matrix_multiply_method
2064  unsigned method = Serial_matrix_matrix_multiply_method;
2065 
2066  // if this matrix is not distributed and matrix in is not distributed
2067  if (!this->distributed() && !matrix_in.distributed() &&
2068  ((method == 1) || (method == 2) || (method == 3)))
2069  {
2070  // NB N is number of rows!
2071  unsigned long N = this->nrow();
2072  unsigned long M = matrix_in.ncol();
2073  unsigned long Nnz = 0;
2074 
2075  // pointers to arrays which store result
2076  int* Row_start = 0;
2077  double* Value = 0;
2078  int* Column_index = 0;
2079 
2080  // get pointers to matrix_in
2081  const int* matrix_in_row_start = matrix_in.row_start();
2082  const int* matrix_in_column_index = matrix_in.column_index();
2083  const double* matrix_in_value = matrix_in.value();
2084 
2085  // get pointers to this matrix
2086  const double* this_value = this->value();
2087  const int* this_row_start = this->row_start();
2088  const int* this_column_index = this->column_index();
2089 
2090  //clock_t clock1 = clock();
2091 
2092  // METHOD 1
2093  // --------
2094  if (method==1)
2095  {
2096  // allocate storage for row starts
2097  Row_start = new int[N+1];
2098  Row_start[0]=0;
2099 
2100  // a set to store number of non-zero columns in each row of result
2101  std::set<unsigned> columns;
2102 
2103  // run through rows of this matrix and matrix_in to find number of
2104  // non-zero entries in each row of result
2105  for (unsigned long this_row = 0; this_row<N; this_row++)
2106  {
2107  // run through non-zeros in this_row of this matrix
2108  for (int this_ptr = this_row_start[this_row];
2109  this_ptr < this_row_start[this_row+1];
2110  this_ptr++)
2111  {
2112  // find column index for non-zero
2113  int matrix_in_row = this_column_index[this_ptr];
2114 
2115  // run through corresponding row in matrix_in
2116  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2117  matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
2118  matrix_in_ptr++)
2119  {
2120  // find column index for non-zero in matrix_in and store in columns
2121  columns.insert(matrix_in_column_index[matrix_in_ptr]);
2122  }
2123  }
2124  // update Row_start
2125  Row_start[this_row+1] = Row_start[this_row] + columns.size();
2126 
2127  // wipe values in columns
2128  columns.clear();
2129  }
2130 
2131  // set Nnz
2132  Nnz = Row_start[N];
2133 
2134  // allocate arrays for result
2135  Value = new double[Nnz];
2136  Column_index = new int[Nnz];
2137 
2138  // set all values of Column_index to -1
2139  for (unsigned long i=0; i<Nnz; i++)
2140  {
2141  Column_index[i] = -1;
2142  }
2143 
2144  // Calculate values for result - first run through rows of this matrix
2145  for (unsigned long this_row = 0; this_row<N; this_row++)
2146  {
2147  // run through non-zeros in this_row
2148  for (int this_ptr = this_row_start[this_row];
2149  this_ptr < this_row_start[this_row+1];
2150  this_ptr++)
2151  {
2152  // find value of non-zero
2153  double this_val = this_value[this_ptr];
2154 
2155  // find column associated with non-zero
2156  int matrix_in_row = this_column_index[this_ptr];
2157 
2158  // run through corresponding row in matrix_in
2159  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2160  matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
2161  matrix_in_ptr++)
2162  {
2163  // find column index for non-zero in matrix_in
2164  int col = matrix_in_column_index[matrix_in_ptr];
2165 
2166  // find position in result to insert value
2167  for(int ptr = Row_start[this_row];
2168  ptr <= Row_start[this_row+1];
2169  ptr++)
2170  {
2171  if (ptr == Row_start[this_row+1])
2172  {
2173  // error - have passed end of row without finding
2174  // correct column
2175  std::ostringstream error_message;
2176  error_message << "Error inserting value in result";
2177 
2178  throw OomphLibError(error_message.str(),
2179  OOMPH_CURRENT_FUNCTION,
2180  OOMPH_EXCEPTION_LOCATION);
2181  }
2182  else if ( Column_index[ptr] == -1 )
2183  {
2184  // first entry for this column index
2185  Column_index[ptr] = col;
2186  Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
2187  break;
2188  }
2189  else if ( Column_index[ptr] == col )
2190  {
2191  // column index already exists - add value
2192  Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
2193  break;
2194  }
2195  }
2196  }
2197  }
2198  }
2199  }
2200 
2201  // METHOD 2
2202  // --------
2203  else if (method==2)
2204  {
2205  // generate array of maps to store values for result
2206  std::map<int,double>* result_maps = new std::map<int,double>[N];
2207 
2208  // run through rows of this matrix
2209  for (unsigned long this_row = 0; this_row<N; this_row++)
2210  {
2211  // run through non-zeros in this_row
2212  for (int this_ptr = this_row_start[this_row];
2213  this_ptr < this_row_start[this_row+1];
2214  this_ptr++)
2215  {
2216  // find value of non-zero
2217  double this_val = this_value[this_ptr];
2218 
2219  // find column index associated with non-zero
2220  int matrix_in_row = this_column_index[this_ptr];
2221 
2222  // run through corresponding row in matrix_in
2223  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2224  matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
2225  matrix_in_ptr++)
2226  {
2227  // find column index for non-zero in matrix_in
2228  int col = matrix_in_column_index[matrix_in_ptr];
2229 
2230  // insert value
2231  result_maps[this_row][col] += this_val * matrix_in_value[matrix_in_ptr];
2232  }
2233  }
2234  }
2235 
2236  // allocate Row_start
2237  Row_start = new int[N+1];
2238 
2239  // copy across row starts
2240  Row_start[0] = 0;
2241  for (unsigned long row=0; row<N; row++)
2242  {
2243  int size = result_maps[row].size();
2244  Row_start[row+1] = Row_start[row] + size;
2245  }
2246 
2247  // set Nnz
2248  Nnz = Row_start[N];
2249 
2250  // allocate other arrays
2251  Value = new double[Nnz];
2252  Column_index = new int[Nnz];
2253 
2254  // copy values and column indices
2255  for (unsigned long row=0; row<N; row++)
2256  {
2257  unsigned ptr = Row_start[row];
2258  for (std::map<int,double>::iterator i = result_maps[row].begin();
2259  i != result_maps[row].end();
2260  i ++)
2261  {
2262  Column_index[ptr]= i->first;
2263  Value[ptr] = i->second;
2264  ptr++;
2265  }
2266  }
2267 
2268  // tidy up memory
2269  delete[] result_maps;
2270  }
2271 
2272  // METHOD 3
2273  // --------
2274  else if (method==3)
2275  {
2276  // vectors of vectors to store results
2277  std::vector< std::vector<int> > result_cols(N);
2278  std::vector< std::vector<double> > result_vals(N);
2279 
2280  // run through the rows of this matrix
2281  for (unsigned long this_row = 0; this_row<N; this_row++)
2282  {
2283  // run through non-zeros in this_row
2284  for (int this_ptr = this_row_start[this_row];
2285  this_ptr < this_row_start[this_row+1];
2286  this_ptr++)
2287  {
2288  // find value of non-zero
2289  double this_val = this_value[this_ptr];
2290 
2291  // find column index associated with non-zero
2292  int matrix_in_row = this_column_index[this_ptr];
2293 
2294  // run through corresponding row in matrix_in
2295  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2296  matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
2297  matrix_in_ptr++)
2298  {
2299  // find column index for non-zero in matrix_in
2300  int col = matrix_in_column_index[matrix_in_ptr];
2301 
2302  // insert value
2303  int size = result_cols[this_row].size();
2304  for (int i = 0; i<=size; i++)
2305  {
2306  if (i==size)
2307  {
2308  // first entry for this column
2309  result_cols[this_row].push_back(col);
2310  result_vals[this_row].push_back(
2311  this_val*matrix_in_value[matrix_in_ptr]);
2312  }
2313  else if (col==result_cols[this_row][i])
2314  {
2315  // column already exists
2316  result_vals[this_row][i] += this_val *
2317  matrix_in_value[matrix_in_ptr];
2318  break;
2319  }
2320  }
2321  }
2322  }
2323  }
2324 
2325  // allocate Row_start
2326  Row_start = new int[N+1];
2327 
2328  // copy across row starts
2329  Row_start[0] = 0;
2330  for (unsigned long row=0; row<N; row++)
2331  {
2332  int size = result_cols[row].size();
2333  Row_start[row+1] = Row_start[row] + size;
2334  }
2335 
2336  // set Nnz
2337  Nnz = Row_start[N];
2338 
2339  // allocate other arrays
2340  Value = new double[Nnz];
2341  Column_index = new int[Nnz];
2342 
2343  // copy across values and column indices
2344  for (unsigned long row=0; row<N; row++)
2345  {
2346  unsigned ptr = Row_start[row];
2347  unsigned nnn=result_cols[row].size();
2348  for (unsigned i = 0; i < nnn; i++)
2349  {
2350  Column_index[ptr] = result_cols[row][i];
2351  Value[ptr] = result_vals[row][i];
2352  ptr++;
2353  }
2354  }
2355  }
2356 
2357  // build
2358  result.build_without_copy(M, Nnz, Value, Column_index, Row_start);
2359  }
2360 
2361  // else we have to use trilinos
2362  else
2363  {
2364 #ifdef OOMPH_HAS_TRILINOS
2365  bool use_ml = false;
2366  if (method == 5)
2367  {
2368  use_ml = true;
2369  }
2370  TrilinosEpetraHelpers::multiply(*this,matrix_in,result,use_ml);
2371 #else
2372  std::ostringstream error_message;
2373  error_message << "Serial_matrix_matrix_multiply_method = "
2374  << Serial_matrix_matrix_multiply_method
2375  << " requires trilinos.";
2376  throw OomphLibError(error_message.str(),
2377  OOMPH_CURRENT_FUNCTION,
2378  OOMPH_EXCEPTION_LOCATION);
2379 #endif
2380  }
2381 }
2382 
2383 
2384 
2385 //=================================================================
2386 /// For every row, find the maximum absolute value of the
2387 /// entries in this row. Set all values that are less than alpha times
2388 /// this maximum to zero and return the resulting matrix in
2389 /// reduced_matrix. Note: Diagonal entries are retained regardless
2390 /// of their size.
2391 //=================================================================
2392 void CRDoubleMatrix::matrix_reduction(const double &alpha,
2393  CRDoubleMatrix &reduced_matrix)
2394 {
2395  // number of rows in matrix
2396  long n_row=nrow_local();
2397  double max_row;
2398 
2399  // Here's the packed format for the new matrix
2400  Vector<int> B_row_start(1);
2401  Vector<int> B_column_index;
2402  Vector<double> B_value;
2403 
2404  // get pointers to the underlying data
2405  const int* row_start = CR_matrix.row_start();
2406  const int* column_index = CR_matrix.column_index();
2407  const double* value = CR_matrix.value();
2408 
2409  // k is counter for the number of entries in the reduced matrix
2410  unsigned k=0;
2411 
2412  // Initialise row start
2413  B_row_start[0]=0;
2414 
2415  // Loop over rows
2416  for(long i=0;i<n_row;i++)
2417  {
2418  // Initialise max value in row
2419  max_row=0.0;
2420 
2421  //Loop over entries in columns
2422  for(long j=row_start[i];j<row_start[i+1];j++)
2423  {
2424  // Find max. value in row
2425  if(std::fabs(value[j])>max_row)
2426  {
2427  max_row=std::fabs(value[j]);
2428  }
2429  }
2430 
2431  // Decide if we need to retain the entries in the row
2432  for(long j=row_start[i];j<row_start[i+1];j++)
2433  {
2434  // If we're on the diagonal or the value is sufficiently large: retain
2435  // i.e. copy across.
2436  if(i==column_index[j] || std::fabs(value[j])>alpha*max_row )
2437  {
2438  B_value.push_back(value[j]);
2439  B_column_index.push_back(column_index[j]);
2440  k++;
2441  }
2442  }
2443  // This writes the row start for the next row -- equal to
2444  // to the number of entries written so far (C++ zero-based indexing!)
2445  B_row_start.push_back(k);
2446  }
2447 
2448  // Build the matrix from the compressed format
2449  dynamic_cast<CRDoubleMatrix&>(reduced_matrix).
2450  build(this->ncol(),B_value,B_column_index,B_row_start);
2451  }
2452 
2453 //=============================================================================
2454 /// if this matrix is distributed then the equivalent global matrix is built
2455 /// using new and returned. The calling method is responsible for the
2456 /// destruction of the new matrix.
2457 //=============================================================================
2459 {
2460 #ifdef OOMPH_HAS_MPI
2461  // if this matrix is not distributed then this method is redundant
2462  if (!this->distributed() ||
2463  this->distribution_pt()->communicator_pt()->nproc() == 1)
2464  {
2465  return new CRDoubleMatrix(*this);
2466  }
2467 
2468  // nnz
2469  int nnz = this->nnz();
2470 
2471  // my nrow local
2472  unsigned nrow_local = this->nrow_local();
2473 
2474  // nrow global
2475  unsigned nrow = this->nrow();
2476 
2477  // cache nproc
2478  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2479 
2480  // get the nnzs on the other processors
2481  int* dist_nnz_pt = new int[nproc];
2482  MPI_Allgather(&nnz,1,MPI_INT,
2483  dist_nnz_pt,1,MPI_INT,
2484  this->distribution_pt()->communicator_pt()->mpi_comm());
2485 
2486  // create a int vector of first rows and nrow local and compute nnz global
2487  int* dist_first_row = new int[nproc];
2488  int* dist_nrow_local = new int[nproc];
2489  int nnz_global = 0;
2490  for (int p = 0; p < nproc; p++)
2491  {
2492  nnz_global += dist_nnz_pt[p];
2493  dist_first_row[p] = this->first_row(p);
2494  dist_nrow_local[p] = this->nrow_local(p);
2495  }
2496 
2497  // compute the offset for the values and column index data
2498  int* nnz_offset = new int[nproc];
2499  nnz_offset[0] = 0;
2500  for (int p = 1; p < nproc; p++)
2501  {
2502  nnz_offset[p] = nnz_offset[p-1] + dist_nnz_pt[p-1];
2503  }
2504 
2505  // get pointers to the (current) distributed data
2506  // const_cast required because MPI requires non-const data when sending
2507  // data
2508  int* dist_row_start = const_cast<int*>(this->row_start());
2509  int* dist_column_index = const_cast<int*>(this->column_index());
2510  double* dist_value = const_cast<double*>(this->value());
2511 
2512  // space for the global matrix
2513  int* global_row_start = new int[nrow+1];
2514  int* global_column_index = new int[nnz_global];
2515  double* global_value = new double[nnz_global];
2516 
2517  // get the row starts
2518  MPI_Allgatherv(dist_row_start,nrow_local,MPI_INT,
2519  global_row_start,dist_nrow_local,dist_first_row,MPI_INT,
2520  this->distribution_pt()->communicator_pt()->mpi_comm());
2521 
2522  // get the column indexes
2523  MPI_Allgatherv(dist_column_index,nnz,MPI_INT,
2524  global_column_index,dist_nnz_pt,nnz_offset,MPI_INT,
2525  this->distribution_pt()->communicator_pt()->mpi_comm());
2526 
2527  // get the values
2528  MPI_Allgatherv(dist_value,nnz,MPI_DOUBLE,
2529  global_value,dist_nnz_pt,nnz_offset,MPI_DOUBLE,
2530  this->distribution_pt()->communicator_pt()->mpi_comm());
2531 
2532  // finally the last row start
2533  global_row_start[nrow] = nnz_global;
2534 
2535  // update the other row start
2536  for (int p = 0; p < nproc; p++)
2537  {
2538  for (int i = 0; i < dist_nrow_local[p]; i++)
2539  {
2540  unsigned j = dist_first_row[p] + i;
2541  global_row_start[j]+=nnz_offset[p];
2542  }
2543  }
2544 
2545  // create the global distribution
2546  LinearAlgebraDistribution* dist_pt = new
2547  LinearAlgebraDistribution(this->distribution_pt()->communicator_pt(),nrow,false);
2548 
2549  // create the matrix
2550  CRDoubleMatrix* matrix_pt = new CRDoubleMatrix(dist_pt);
2551 
2552  // copy of distribution taken so delete
2553  delete dist_pt;
2554 
2555  // pass data into matrix
2556  matrix_pt->build_without_copy(this->ncol(),nnz_global,global_value,
2557  global_column_index,global_row_start);
2558 
2559  // clean up
2560  delete dist_first_row;
2561  delete dist_nrow_local;
2562  delete nnz_offset;
2563  delete dist_nnz_pt;
2564 
2565  // and return
2566  return matrix_pt;
2567 #else
2568  return new CRDoubleMatrix(*this);
2569 #endif
2570 }
2571 
2572  //============================================================================
2573  /// The contents of the matrix are redistributed to match the new
2574  /// distribution. In a non-MPI build this method does nothing.
2575  /// \b NOTE 1: The current distribution and the new distribution must have
2576  /// the same number of global rows.
2577  /// \b NOTE 2: The current distribution and the new distribution must have
2578  /// the same Communicator.
2579  //============================================================================
2581  const& dist_pt)
2582  {
2583 #ifdef OOMPH_HAS_MPI
2584 #ifdef PARANOID
2585  // paranoid check that the nrows for both distributions is the
2586  // same
2587  if (dist_pt->nrow() != this->distribution_pt()->nrow())
2588  {
2589  std::ostringstream error_message;
2590  error_message << "The number of global rows in the new distribution ("
2591  << dist_pt->nrow() << ") is not equal to the number"
2592  << " of global rows in the current distribution ("
2593  << this->distribution_pt()->nrow() << ").\n";
2594  throw OomphLibError(error_message.str(),
2595  OOMPH_CURRENT_FUNCTION,
2596  OOMPH_EXCEPTION_LOCATION);
2597  }
2598  // paranoid check that the current distribution and the new distribution
2599  // have the same Communicator
2600  OomphCommunicator temp_comm(*dist_pt->communicator_pt());
2601  if (!(temp_comm == *this->distribution_pt()->communicator_pt()))
2602  {
2603  std::ostringstream error_message;
2604  error_message << "The new distribution and the current distribution must "
2605  << "have the same communicator.";
2606  throw OomphLibError(error_message.str(),
2607  OOMPH_CURRENT_FUNCTION,
2608  OOMPH_EXCEPTION_LOCATION);
2609  }
2610  // paranoid check that the matrix is build
2611  if (!this->built())
2612  {
2613  std::ostringstream error_message;
2614  error_message << "The matrix must be build to be redistributed";
2615  throw OomphLibError(error_message.str(),
2616  OOMPH_CURRENT_FUNCTION,
2617  OOMPH_EXCEPTION_LOCATION);
2618  }
2619 #endif
2620 
2621  // if the two distributions are not the same
2622  // =========================================
2623  if (!((*this->distribution_pt()) == *dist_pt))
2624  {
2625  // Get the number of columns to build the matrix.
2626  unsigned long ncol = this->ncol();
2627 
2628  // current data
2629  int* current_row_start = this->row_start();
2630  int* current_column_index = this->column_index();
2631  double* current_value = this->value();
2632 
2633  // get the rank and the number of processors
2634  int my_rank = this->distribution_pt()->communicator_pt()->my_rank();
2635  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2636 
2637  // if both distributions are distributed
2638  // =====================================
2639  if (this->distributed() && dist_pt->distributed())
2640  {
2641 
2642  // new nrow_local and first_row data
2643  Vector<unsigned> new_first_row(nproc);
2644  Vector<unsigned> new_nrow_local(nproc);
2645  Vector<unsigned> current_first_row(nproc);
2646  Vector<unsigned> current_nrow_local(nproc);
2647  for (int i = 0; i < nproc; i++)
2648  {
2649  new_first_row[i] = dist_pt->first_row(i);
2650  new_nrow_local[i] = dist_pt->nrow_local(i);
2651  current_first_row[i] = this->first_row(i);
2652  current_nrow_local[i] = this->nrow_local(i);
2653  }
2654 
2655  // compute which local rows are expected to be received from each
2656  // processor / sent to each processor
2657  Vector<unsigned> first_row_for_proc(nproc,0);
2658  Vector<unsigned> nrow_local_for_proc(nproc,0);
2659  Vector<unsigned> first_row_from_proc(nproc,0);
2660  Vector<unsigned> nrow_local_from_proc(nproc,0);
2661 
2662  // for every processor compute first_row and nrow_local that will
2663  // will sent and received by this processor
2664  for (int p = 0; p < nproc; p++)
2665  {
2666  // start with data to be sent
2667  if ((new_first_row[p] < (current_first_row[my_rank] +
2668  current_nrow_local[my_rank])) &&
2669  (current_first_row[my_rank] < (new_first_row[p] +
2670  new_nrow_local[p])))
2671  {
2672  first_row_for_proc[p] =
2673  std::max(current_first_row[my_rank],
2674  new_first_row[p]);
2675  nrow_local_for_proc[p] =
2676  std::min((current_first_row[my_rank] +
2677  current_nrow_local[my_rank]),
2678  (new_first_row[p] +
2679  new_nrow_local[p])) - first_row_for_proc[p];
2680  }
2681 
2682  // and data to be received
2683  if ((new_first_row[my_rank] < (current_first_row[p] +
2684  current_nrow_local[p]))
2685  && (current_first_row[p] < (new_first_row[my_rank] +
2686  new_nrow_local[my_rank])))
2687  {
2688  first_row_from_proc[p] =
2689  std::max(current_first_row[p],
2690  new_first_row[my_rank]);
2691  nrow_local_from_proc[p] =
2692  std::min((current_first_row[p] +
2693  current_nrow_local[p]),
2694  (new_first_row[my_rank] +
2695  new_nrow_local[my_rank]))-first_row_from_proc[p];
2696  }
2697  }
2698 
2699  // determine how many nnzs to send to each processor
2700  Vector<unsigned> nnz_for_proc(nproc,0);
2701  for (int p = 0; p < nproc; p++)
2702  {
2703  if (nrow_local_for_proc[p] > 0)
2704  {
2705  nnz_for_proc[p] = (current_row_start[first_row_for_proc[p]-
2706  current_first_row[my_rank]+
2707  nrow_local_for_proc[p]]-
2708  current_row_start[first_row_for_proc[p]-
2709  current_first_row[my_rank]]);
2710  }
2711  }
2712 
2713  // next post non-blocking sends and recv for the nnzs
2714  Vector<unsigned> nnz_from_proc(nproc,0);
2715  Vector<MPI_Request> send_req;
2716  Vector<MPI_Request> nnz_recv_req;
2717  for (int p = 0; p < nproc; p++)
2718  {
2719  if (p != my_rank)
2720  {
2721  // send
2722  if (nrow_local_for_proc[p] > 0)
2723  {
2724  MPI_Request req;
2725  MPI_Isend(&nnz_for_proc[p],1,MPI_UNSIGNED,p,0,
2726  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
2727  send_req.push_back(req);
2728  }
2729 
2730  // recv
2731  if (nrow_local_from_proc[p] > 0)
2732  {
2733  MPI_Request req;
2734  MPI_Irecv(&nnz_from_proc[p],1,MPI_UNSIGNED,p,0,
2735  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
2736  nnz_recv_req.push_back(req);
2737  }
2738  }
2739  // "send to self"
2740  else
2741  {
2742  nnz_from_proc[p] = nnz_for_proc[p];
2743  }
2744  }
2745 
2746  // allocate new storage for the new row_start
2747  int* new_row_start = new int[new_nrow_local[my_rank]+1];
2748 
2749  // wait for recvs to complete
2750  unsigned n_recv_req = nnz_recv_req.size();
2751  if (n_recv_req > 0)
2752  {
2753  Vector<MPI_Status> recv_status(n_recv_req);
2754  MPI_Waitall(n_recv_req,&nnz_recv_req[0],&recv_status[0]);
2755  }
2756 
2757  // compute the nnz offset for each processor
2758  unsigned next_row = 0;
2759  unsigned nnz_count = 0;
2760  Vector<unsigned> nnz_offset(nproc,0);
2761  for (int p = 0; p < nproc; p++)
2762  {
2763  unsigned pp = 0;
2764  while (new_first_row[pp] != next_row) { pp++; }
2765  nnz_offset[pp] = nnz_count;
2766  nnz_count+= nnz_from_proc[pp];
2767  next_row += new_nrow_local[pp];
2768  }
2769 
2770  // allocate storage for the values and column indices
2771  int* new_column_index = new int[nnz_count];
2772  double* new_value = new double[nnz_count];
2773 
2774  // post the sends and recvs for the matrix data
2775  Vector<MPI_Request> recv_req;
2776  MPI_Aint base_address;
2777  MPI_Get_address(new_value,&base_address);
2778  for (int p = 0; p < nproc; p++)
2779  {
2780  // communicated with other processors
2781  if (p != my_rank)
2782  {
2783  // SEND
2784  if (nrow_local_for_proc[p] > 0)
2785  {
2786  // array of datatypes
2787  MPI_Datatype types[3];
2788 
2789  // array of offsets
2790  MPI_Aint offsets[3];
2791 
2792  // array of lengths
2793  int len[3];
2794 
2795  // row start
2796  unsigned first_row_to_send = first_row_for_proc[p] -
2797  current_first_row[my_rank];
2798  MPI_Type_contiguous(nrow_local_for_proc[p],MPI_INT,
2799  &types[0]);
2800  MPI_Type_commit(&types[0]);
2801  len[0] = 1;
2802  MPI_Get_address(current_row_start+first_row_to_send,&offsets[0]);
2803  offsets[0] -= base_address;
2804 
2805  // values
2806  unsigned first_coef_to_send
2807  = current_row_start[first_row_to_send];
2808  MPI_Type_contiguous(nnz_for_proc[p],MPI_DOUBLE,
2809  &types[1]);
2810  MPI_Type_commit(&types[1]);
2811  len[1] = 1;
2812  MPI_Get_address(current_value+first_coef_to_send,&offsets[1]);
2813  offsets[1] -= base_address;
2814 
2815  // column index
2816  MPI_Type_contiguous(nnz_for_proc[p],MPI_DOUBLE,
2817  &types[2]);
2818  MPI_Type_commit(&types[2]);
2819  len[2] = 1;
2820  MPI_Get_address(current_column_index+first_coef_to_send,&offsets[2]);
2821  offsets[2] -= base_address;
2822 
2823  // build the combined datatype
2824  MPI_Datatype send_type;
2825  MPI_Type_create_struct(3,len,offsets,types,&send_type);
2826  MPI_Type_commit(&send_type);
2827  MPI_Type_free(&types[0]);
2828  MPI_Type_free(&types[1]);
2829  MPI_Type_free(&types[2]);
2830 
2831  // and send
2832  MPI_Request req;
2833  MPI_Isend(new_value,1,send_type,p,1,
2834  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
2835  send_req.push_back(req);
2836  MPI_Type_free(&send_type);
2837  }
2838 
2839  // RECV
2840  if (nrow_local_from_proc[p] > 0)
2841  {
2842  // array of datatypes
2843  MPI_Datatype types[3];
2844 
2845  // array of offsets
2846  MPI_Aint offsets[3];
2847 
2848  // array of lengths
2849  int len[3];
2850 
2851  // row start
2852  unsigned first_row_to_recv = first_row_from_proc[p] -
2853  new_first_row[my_rank];
2854  MPI_Type_contiguous(nrow_local_from_proc[p],MPI_INT,
2855  &types[0]);
2856  MPI_Type_commit(&types[0]);
2857  len[0] = 1;
2858  MPI_Get_address(new_row_start+first_row_to_recv,&offsets[0]);
2859  offsets[0] -= base_address;
2860 
2861  // values
2862  unsigned first_coef_to_recv = nnz_offset[p];
2863  MPI_Type_contiguous(nnz_from_proc[p],MPI_DOUBLE,
2864  &types[1]);
2865  MPI_Type_commit(&types[1]);
2866  len[1] = 1;
2867  MPI_Get_address(new_value+first_coef_to_recv,&offsets[1]);
2868  offsets[1] -= base_address;
2869 
2870  // column index
2871  MPI_Type_contiguous(nnz_from_proc[p],MPI_INT,
2872  &types[2]);
2873  MPI_Type_commit(&types[2]);
2874  len[2] = 1;
2875  MPI_Get_address(new_column_index+first_coef_to_recv,&offsets[2]);
2876  offsets[2] -= base_address;
2877 
2878  // build the combined datatype
2879  MPI_Datatype recv_type;
2880  MPI_Type_create_struct(3,len,offsets,types,&recv_type);
2881  MPI_Type_commit(&recv_type);
2882  MPI_Type_free(&types[0]);
2883  MPI_Type_free(&types[1]);
2884  MPI_Type_free(&types[2]);
2885 
2886  // and send
2887  MPI_Request req;
2888  MPI_Irecv(new_value,1,recv_type,p,1,
2889  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
2890  recv_req.push_back(req);
2891  MPI_Type_free(&recv_type);
2892  }
2893  }
2894  // other wise transfer data internally
2895  else
2896  {
2897  unsigned j = first_row_for_proc[my_rank] -
2898  current_first_row[my_rank];
2899  unsigned k = first_row_from_proc[my_rank] -
2900  new_first_row[my_rank];
2901  for (unsigned i = 0; i < nrow_local_for_proc[my_rank]; i++)
2902  {
2903  new_row_start[k+i] = current_row_start[j+i];
2904  }
2905  unsigned first_coef_to_send = current_row_start[j];
2906  for (unsigned i = 0; i < nnz_for_proc[my_rank]; i++)
2907  {
2908  new_value[nnz_offset[p]+i]=current_value[first_coef_to_send+i];
2909  new_column_index[nnz_offset[p]+i]
2910  =current_column_index[first_coef_to_send+i];
2911  }
2912  }
2913  }
2914 
2915  // wait for all recvs to complete
2916  n_recv_req = recv_req.size();
2917  if (n_recv_req > 0)
2918  {
2919  Vector<MPI_Status> recv_status(n_recv_req);
2920  MPI_Waitall(n_recv_req,&recv_req[0],&recv_status[0]);
2921  }
2922 
2923  // next we need to update the row starts
2924  for (int p = 0; p < nproc; p++)
2925  {
2926  if (nrow_local_from_proc[p] > 0)
2927  {
2928  unsigned first_row = first_row_from_proc[p]-new_first_row[my_rank];
2929  unsigned last_row = first_row + nrow_local_from_proc[p]-1;
2930  int update = nnz_offset[p] - new_row_start[first_row];
2931  for (unsigned i = first_row; i <= last_row; i++)
2932  {
2933  new_row_start[i]+=update;
2934  }
2935  }
2936  }
2937  new_row_start[dist_pt->nrow_local()] = nnz_count;
2938 
2939  // wait for sends to complete
2940  unsigned n_send_req = send_req.size();
2941  if (n_recv_req > 0)
2942  {
2943  Vector<MPI_Status> send_status(n_recv_req);
2944  MPI_Waitall(n_send_req,&send_req[0],&send_status[0]);
2945  }
2946  // if (my_rank == 0)
2947  // {
2948  // CRDoubleMatrix* m_pt = this->global_matrix();
2949  // m_pt->sparse_indexed_output("m1.dat");
2950  // }
2951 
2952  //
2953  this->build(dist_pt);
2954  this->build_without_copy(ncol,nnz_count,
2955  new_value,new_column_index,
2956  new_row_start);
2957  // if (my_rank == 0)
2958  // {
2959  // CRDoubleMatrix* m_pt = this->global_matrix();
2960  // m_pt->sparse_indexed_output("m2.dat");
2961  // }
2962 // this->sparse_indexed_output(oomph_info);
2963  abort();
2964  }
2965 
2966  // if this matrix is distributed but the new distributed matrix is global
2967  // ======================================================================
2968  else if (this->distributed() && !dist_pt->distributed())
2969  {
2970 
2971  // nnz
2972  int nnz = this->nnz();
2973 
2974  // nrow global
2975  unsigned nrow = this->nrow();
2976 
2977  // cache nproc
2978  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2979 
2980  // get the nnzs on the other processors
2981  int* dist_nnz_pt = new int[nproc];
2982  MPI_Allgather(&nnz,1,MPI_INT,
2983  dist_nnz_pt,1,MPI_INT,
2984  this->distribution_pt()->communicator_pt()->mpi_comm());
2985 
2986  // create an int array of first rows and nrow local and
2987  // compute nnz global
2988  int* dist_first_row = new int[nproc];
2989  int* dist_nrow_local = new int[nproc];
2990  for (int p = 0; p < nproc; p++)
2991  {
2992  dist_first_row[p] = this->first_row(p);
2993  dist_nrow_local[p] = this->nrow_local(p);
2994  }
2995 
2996  // conpute the offset for the values and column index data
2997  // compute the nnz offset for each processor
2998  int next_row = 0;
2999  unsigned nnz_count = 0;
3000  Vector<unsigned> nnz_offset(nproc,0);
3001  for (int p = 0; p < nproc; p++)
3002  {
3003  unsigned pp = 0;
3004  while (dist_first_row[pp] != next_row) { pp++; }
3005  nnz_offset[pp] = nnz_count;
3006  nnz_count+=dist_nnz_pt[pp];
3007  next_row+=dist_nrow_local[pp];
3008  }
3009 
3010  // get pointers to the (current) distributed data
3011  int* dist_row_start = this->row_start();
3012  int* dist_column_index = this->column_index();
3013  double* dist_value = this->value();
3014 
3015  // space for the global matrix
3016  int* global_row_start = new int[nrow+1];
3017  int* global_column_index = new int[nnz_count];
3018  double* global_value = new double[nnz_count];
3019 
3020  // post the sends and recvs for the matrix data
3021  Vector<MPI_Request> recv_req;
3022  Vector<MPI_Request> send_req;
3023  MPI_Aint base_address;
3024  MPI_Get_address(global_value,&base_address);
3025 
3026  // SEND
3027  if (dist_nrow_local[my_rank] > 0)
3028  {
3029  // types
3030  MPI_Datatype types[3];
3031 
3032  // offsets
3033  MPI_Aint offsets[3];
3034 
3035  // lengths
3036  int len[3];
3037 
3038  // row start
3039  MPI_Type_contiguous(dist_nrow_local[my_rank],MPI_INT,&types[0]);
3040  MPI_Type_commit(&types[0]);
3041  MPI_Get_address(dist_row_start,&offsets[0]);
3042  offsets[0] -= base_address;
3043  len[0] = 1;
3044 
3045  // value
3046  MPI_Type_contiguous(nnz,MPI_DOUBLE,&types[1]);
3047  MPI_Type_commit(&types[1]);
3048  MPI_Get_address(dist_value,&offsets[1]);
3049  offsets[1] -= base_address;
3050  len[1] = 1;
3051 
3052  // column indices
3053  MPI_Type_contiguous(nnz,MPI_INT,&types[2]);
3054  MPI_Type_commit(&types[2]);
3055  MPI_Get_address(dist_column_index,&offsets[2]);
3056  offsets[2] -= base_address;
3057  len[2] = 1;
3058 
3059  // build the send type
3060  MPI_Datatype send_type;
3061  MPI_Type_create_struct(3,len,offsets,types,&send_type);
3062  MPI_Type_commit(&send_type);
3063  MPI_Type_free(&types[0]);
3064  MPI_Type_free(&types[1]);
3065  MPI_Type_free(&types[2]);
3066 
3067  // and send
3068  for (int p = 0; p < nproc; p++)
3069  {
3070  if (p != my_rank)
3071  {
3072  MPI_Request req;
3073  MPI_Isend(global_value,1,send_type,p,1,
3074  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
3075  send_req.push_back(req);
3076  }
3077  }
3078  MPI_Type_free(&send_type);
3079  }
3080 
3081  // RECV
3082  for (int p = 0; p < nproc; p++)
3083  {
3084  // communicated with other processors
3085  if (p != my_rank)
3086  {
3087  // RECV
3088  if (dist_nrow_local[p] > 0)
3089  {
3090 
3091  // types
3092  MPI_Datatype types[3];
3093 
3094  // offsets
3095  MPI_Aint offsets[3];
3096 
3097  // lengths
3098  int len[3];
3099 
3100  // row start
3101  MPI_Type_contiguous(dist_nrow_local[p],MPI_INT,&types[0]);
3102  MPI_Type_commit(&types[0]);
3103  MPI_Get_address(global_row_start+dist_first_row[p],&offsets[0]);
3104  offsets[0] -= base_address;
3105  len[0] = 1;
3106 
3107  // value
3108  MPI_Type_contiguous(dist_nnz_pt[p],MPI_DOUBLE,&types[1]);
3109  MPI_Type_commit(&types[1]);
3110  MPI_Get_address(global_value+nnz_offset[p],&offsets[1]);
3111  offsets[1] -= base_address;
3112  len[1] = 1;
3113 
3114  // column indices
3115  MPI_Type_contiguous(dist_nnz_pt[p],MPI_INT,&types[2]);
3116  MPI_Type_commit(&types[2]);
3117  MPI_Get_address(global_column_index+nnz_offset[p],&offsets[2]);
3118  offsets[2] -= base_address;
3119  len[2] = 1;
3120 
3121  // build the send type
3122  MPI_Datatype recv_type;
3123  MPI_Type_create_struct(3,len,offsets,types,&recv_type);
3124  MPI_Type_commit(&recv_type);
3125  MPI_Type_free(&types[0]);
3126  MPI_Type_free(&types[1]);
3127  MPI_Type_free(&types[2]);
3128 
3129  // and send
3130  MPI_Request req;
3131  MPI_Irecv(global_value,1,recv_type,p,1,
3132  this->distribution_pt()->communicator_pt()->mpi_comm(),&req);
3133  recv_req.push_back(req);
3134  MPI_Type_free(&recv_type);
3135  }
3136  }
3137  // otherwise send to self
3138  else
3139  {
3140  unsigned nrow_local = dist_nrow_local[my_rank];
3141  unsigned first_row = dist_first_row[my_rank];
3142  for (unsigned i = 0; i < nrow_local; i++)
3143  {
3144  global_row_start[first_row+i]=dist_row_start[i];
3145  }
3146  unsigned offset = nnz_offset[my_rank];
3147  for (int i = 0; i < nnz; i++)
3148  {
3149  global_value[offset+i]=dist_value[i];
3150  global_column_index[offset+i]=dist_column_index[i];
3151  }
3152  }
3153  }
3154 
3155  // wait for all recvs to complete
3156  unsigned n_recv_req = recv_req.size();
3157  if (n_recv_req > 0)
3158  {
3159  Vector<MPI_Status> recv_status(n_recv_req);
3160  MPI_Waitall(n_recv_req,&recv_req[0],&recv_status[0]);
3161  }
3162 
3163  // finally the last row start
3164  global_row_start[nrow] = nnz_count;
3165 
3166  // update the other row start
3167  for (int p = 0; p < nproc; p++)
3168  {
3169  for (int i = 0; i < dist_nrow_local[p]; i++)
3170  {
3171  unsigned j = dist_first_row[p] + i;
3172  global_row_start[j]+=nnz_offset[p];
3173  }
3174  }
3175 
3176  // wait for sends to complete
3177  unsigned n_send_req = send_req.size();
3178  if (n_recv_req > 0)
3179  {
3180  Vector<MPI_Status> send_status(n_recv_req);
3181  MPI_Waitall(n_send_req,&send_req[0],&send_status[0]);
3182  }
3183 
3184  // rebuild the matrix
3185  LinearAlgebraDistribution* dist_pt = new
3186  LinearAlgebraDistribution(this->distribution_pt()->communicator_pt(),
3187  nrow,false);
3188  this->build(dist_pt);
3189  this->build_without_copy(ncol,nnz_count,
3190  global_value,global_column_index,
3191  global_row_start);
3192 
3193  // clean up
3194  delete dist_pt;
3195  delete[] dist_first_row;
3196  delete[] dist_nrow_local;
3197  delete[] dist_nnz_pt;
3198  }
3199 
3200  // other the matrix is not distributed but it needs to be turned
3201  // into a distributed matrix
3202  // =============================================================
3203  else if (!this->distributed() && dist_pt->distributed())
3204  {
3205 
3206  // cache the new nrow_local
3207  unsigned nrow_local = dist_pt->nrow_local();
3208 
3209  // and first_row
3210  unsigned first_row = dist_pt->first_row();
3211 
3212  // get pointers to the (current) distributed data
3213  int* global_row_start = this->row_start();
3214  int* global_column_index = this->column_index();
3215  double* global_value = this->value();
3216 
3217  // determine the number of non zeros required by this processor
3218  unsigned nnz = global_row_start[first_row+nrow_local] -
3219  global_row_start[first_row];
3220 
3221  // allocate
3222  int* dist_row_start = new int[nrow_local+1];
3223  int* dist_column_index = new int[nnz];
3224  double* dist_value = new double[nnz];
3225 
3226  // copy
3227  int offset = global_row_start[first_row];
3228  for (unsigned i = 0; i <= nrow_local; i++)
3229  {
3230  dist_row_start[i] = global_row_start[first_row+i]-offset;
3231  }
3232  for (unsigned i = 0; i < nnz; i++)
3233  {
3234  dist_column_index[i] = global_column_index[offset+i];
3235  dist_value[i] = global_value[offset+i];
3236  }
3237 
3238  // rebuild
3239  this->build(dist_pt);
3240  this->build_without_copy(ncol,nnz,dist_value,
3241  dist_column_index,dist_row_start);
3242  }
3243  }
3244 #endif
3245  }
3246 
3247 //=============================================================================
3248 /// Compute transpose of matrix
3249 //=============================================================================
3251  {
3252  // Get the number of non_zeros
3253  unsigned long nnon_zeros=this->nnz();
3254 
3255  // Find the number of rows and columns in the transposed
3256  // matrix. We differentiate these from those associated
3257  // with the original matrix by appending the characters
3258  // '_t' onto the end
3259  unsigned long n_rows=this->nrow();
3260  unsigned long n_rows_t=this->ncol();
3261 
3262 #ifdef OOMPH_HAS_MPI
3263  // We only need to produce a warning if the matrix is distributed
3264  if (this->distributed())
3265  {
3266  // Create an ostringstream object to store the warning message
3267  std::ostringstream warning_message;
3268 
3269  // Create the warning messsage
3270  warning_message << "This method currently works for serial but "
3271  << "has not been tested with MPI!\n";
3272 
3273  // Issue the warning
3274  OomphLibWarning(warning_message.str(),
3275  OOMPH_CURRENT_FUNCTION,
3276  OOMPH_EXCEPTION_LOCATION);
3277  }
3278 #endif
3279 
3280  // Set up the distribution for the transposed matrix
3281  result->distribution_pt()->build(this->distribution_pt()->
3282  communicator_pt(),n_rows_t,false);
3283 
3284  // Acquire access to the value, row_start and column_index
3285  // arrays from the CR matrix
3286  const double* value_pt=this->value();
3287  const int* row_start_pt=this->row_start();
3288  const int* column_index_pt=this->column_index();
3289 
3290  // Allocate space for the row_start and column_index vectors
3291  // associated with the transpose of the current matrix.
3292  Vector<double> value_t(nnon_zeros,0.0);
3293  Vector<int> column_index_t(nnon_zeros,0);
3294  Vector<int> row_start_t(n_rows_t+1,0);
3295 
3296  // Loop over the column index vector and count how many times
3297  // each column number occurs and increment the appropriate
3298  // entry in row_start_t whose i+1'th entry will contain the
3299  // number of non-zeros in the i'th column of the matrix (this
3300  // is done so that the cumulative sum done next returns the
3301  // correct result)
3302  for (unsigned i=0;i<nnon_zeros;i++)
3303  {
3304  // Assign entries to row_start_t (noting the first
3305  // entry is left as 0 for the cumulative sum done later)
3306  row_start_t[*(column_index_pt+i)+1]++;
3307  }
3308 
3309  // Calculate the sum of the first i entries in the row_start_t
3310  // vector and store the values in the i'th entry of row_start_t
3311  for (unsigned i=1;i<n_rows_t+1;i++)
3312  {
3313  // Calculate the cumulative sum
3314  row_start_t[i]+=row_start_t[i-1];
3315  }
3316 
3317  // Allocate space for variables to store the indices of the
3318  // start and end of the non-zeros in a given row of the matrix
3319  unsigned i_row_s=0;
3320  unsigned i_row_e=0;
3321 
3322  // Initialise 3 extra variables for readability of the
3323  // code in the subsequent piece of code
3324  unsigned a=0;
3325  unsigned b=0;
3326  unsigned c=0;
3327 
3328  // Vector needed to count the number of entries added
3329  // to each segment in column_index_t where each segment
3330  // is associated with each row in the transposed matrix
3331  Vector<int> counter(n_rows_t,0);
3332 
3333  // Set the entries in column_index_t. To do this we loop
3334  // over each row of the original matrix (equivalently
3335  // the number of columns in the transpose)
3336  for (unsigned i_row=0;i_row<n_rows;i_row++)
3337  {
3338  // Here we find the indices of the start and end
3339  // of the non-zeros in i_row'th row of the matrix.
3340  // [Note, there should be a -1 on i_row_e but this
3341  // is ignored so that we use a strict inequality
3342  // in the subsequent for-loop!]
3343  i_row_s=*(row_start_pt+i_row);
3344  i_row_e=*(row_start_pt+i_row+1);
3345 
3346  // Loop over the entries in the i_row'th row
3347  // of the matrix
3348  for (unsigned j=i_row_s;j<i_row_e;j++)
3349  {
3350 
3351  // The value of a is the column index of the j'th
3352  // element in the i_row'th row of the original matrix
3353  // (which is also the row index of the associated
3354  // non-zero in the transposed matrix)
3355  a=*(column_index_pt+j);
3356 
3357  // The value of b will be used to find the start
3358  // of the appropriate segment of column_index_t
3359  // that the non-zero belongs in
3360  b=row_start_t[a];
3361 
3362  // Find the number of elements already added to
3363  // this segment (to find which entry of the segment
3364  // the value i_row, the column index of the non-zero
3365  // in the transposed matrix, should be assigned to)
3366  c=counter[*(column_index_pt+j)];
3367 
3368  // Assign the value i_row to the appropriate entry
3369  // in column_index_t
3370  column_index_t[b+c]=i_row;
3371  value_t[b+c]=*(value_pt+j);
3372 
3373  // Increment the j'th value of the vector counter
3374  // to indicate another non-zero index has been
3375  // added into the
3376  counter[*(column_index_pt+j)]++;
3377 
3378  } // End of for-loop over i_row'th row of the matrix
3379 
3380  } // End of for-loop over rows of the matrix
3381 
3382  // Build the matrix (note: the value of n_cols for the
3383  // transposed matrix is n_rows for the original matrix)
3384  result->build(n_rows,value_t,column_index_t,row_start_t);
3385 
3386  } // End of the function
3387 
3388 
3389 //=============================================================================
3390 /// Compute infinity (maximum) norm of matrix
3391 //=============================================================================
3393  {
3394 #ifdef PARANOID
3395  // paranoid check that the vector is setup
3396  if (!this->distribution_built())
3397  {
3398  std::ostringstream error_message;
3399  error_message << "This matrix must be setup.";
3400  throw OomphLibError(error_message.str(),
3401  OOMPH_CURRENT_FUNCTION,
3402  OOMPH_EXCEPTION_LOCATION);
3403  }
3404 #endif
3405 
3406  // compute the local norm
3407  unsigned nrow_local = this->nrow_local();
3408  double n = 0;
3409  const int* row_start = CR_matrix.row_start();
3410  const double* value = CR_matrix.value();
3411  for (unsigned i = 0; i < nrow_local; i++)
3412  {
3413  double a = 0;
3414  for (int j = row_start[i]; j < row_start[i+1]; j++)
3415  {
3416  a += fabs(value[j]);
3417  }
3418  n = std::max(n,a);
3419  }
3420 
3421  // if this vector is distributed and on multiple processors then gather
3422 #ifdef OOMPH_HAS_MPI
3423  double n2 = n;
3424  if ( this->distributed() &&
3425  this->distribution_pt()->communicator_pt()->nproc() > 1)
3426  {
3427  MPI_Allreduce(&n,&n2,1,MPI_DOUBLE,MPI_MAX,
3428  this->distribution_pt()->communicator_pt()->mpi_comm());
3429  }
3430  n = n2;
3431 #endif
3432 
3433  // and return
3434  return n;
3435  }
3436 
3437 //=============================================================================
3438 /// Return the diagonal entries of the matrix.
3439 /// This only works with square matrices. This condition may be relaxed
3440 /// in the future if need be.
3441 //=============================================================================
3443 {
3444 
3445 #ifdef PARANOID
3446  // Check if the matrix has been built.
3447  if(!this->built())
3448  {
3449  std::ostringstream error_message;
3450  error_message << "The matrix has not been built.\n"
3451  << "Please build it...\n";
3452  throw OomphLibError(error_message.str(),
3453  OOMPH_CURRENT_FUNCTION,
3454  OOMPH_EXCEPTION_LOCATION);
3455  }
3456 
3457  // Check if this is a square matrix.
3458  if(this->nrow() != this->ncol())
3459  {
3460  std::ostringstream error_message;
3461  error_message << "The matrix is not square. Can only get the diagonal\n"
3462  << "entries of a square matrix.\n";
3463  throw OomphLibError(error_message.str(),
3464  OOMPH_CURRENT_FUNCTION,
3465  OOMPH_EXCEPTION_LOCATION);
3466  }
3467 #endif
3468 
3469  // We get the diagonal entries on this processor.
3470  unsigned nrow_local = this->nrow_local();
3471 
3472  // Create storage for the diagonal entries.
3473  Vector<double> result_vec;
3474  result_vec.reserve(nrow_local);
3475 
3476  // Get the first row for the column offset.
3477  unsigned first_row = this->first_row();
3478 
3479  // Loop through the local rows.
3480  for (unsigned i = 0; i < nrow_local; i++)
3481  {
3482  // The column entries are globally indexed.
3483  unsigned diag_entry_col = first_row + i;
3484 
3485  // Push back the diagonal entry.
3486  result_vec.push_back(CR_matrix.get_entry(i, diag_entry_col));
3487  }
3488 
3489  return result_vec;
3490 }
3491 
3492 //=============================================================================
3493 /// Element-wise addition of this matrix with matrix_in.
3494 //=============================================================================
3495 void CRDoubleMatrix::add(const CRDoubleMatrix &matrix_in,
3496  CRDoubleMatrix &result_matrix) const
3497 {
3498 #ifdef PARANOID
3499  // Check if this matrix is built.
3500  if(!this->built())
3501  {
3502  std::ostringstream error_message;
3503  error_message << "The matrix is not built.\n"
3504  << "Please build the matrix!\n";
3505  throw OomphLibError(error_message.str(),
3506  OOMPH_CURRENT_FUNCTION,
3507  OOMPH_EXCEPTION_LOCATION);
3508  }
3509 
3510  // Check if this matrix_in is built.
3511  if(!matrix_in.built())
3512  {
3513  std::ostringstream error_message;
3514  error_message << "The matrix matrix_in is not built.\n"
3515  << "Please build the matrix!\n";
3516  throw OomphLibError(error_message.str(),
3517  OOMPH_CURRENT_FUNCTION,
3518  OOMPH_EXCEPTION_LOCATION);
3519  }
3520 
3521  // Check if the dimensions of this matrix and matrix_in are the same.
3522  unsigned long this_nrow = this->nrow();
3523  unsigned long matrix_in_nrow = matrix_in.nrow();
3524  if(this_nrow != matrix_in_nrow)
3525  {
3526  std::ostringstream error_message;
3527  error_message << "matrix_in has a different number of rows than\n"
3528  << "this matrix.\n";
3529  throw OomphLibError(error_message.str(),
3530  OOMPH_CURRENT_FUNCTION,
3531  OOMPH_EXCEPTION_LOCATION);
3532  }
3533 
3534  unsigned long this_ncol = this->ncol();
3535  unsigned long matrix_in_ncol = matrix_in.ncol();
3536  if(this_ncol != matrix_in_ncol)
3537  {
3538  std::ostringstream error_message;
3539  error_message << "matrix_in has a different number of columns than\n"
3540  << "this matrix.\n";
3541  throw OomphLibError(error_message.str(),
3542  OOMPH_CURRENT_FUNCTION,
3543  OOMPH_EXCEPTION_LOCATION);
3544  }
3545 
3546  // Check if the distribution is the same (Otherwise we may have to send and
3547  // receive data from other processors - which is not implemented!)
3548  if(*(this->distribution_pt()) != *(matrix_in.distribution_pt()))
3549  {
3550  std::ostringstream error_message;
3551  error_message << "matrix_in must have the same distribution as\n"
3552  << "this matrix.\n";
3553  throw OomphLibError(error_message.str(),
3554  OOMPH_CURRENT_FUNCTION,
3555  OOMPH_EXCEPTION_LOCATION);
3556  }
3557 
3558  // If the matrix is built, check that it's existing distribution is the
3559  // same as the in matrix. Since we'll use the same distribution instead
3560  // of completely rebuilding it.
3561  if(result_matrix.built() &&
3562  (*result_matrix.distribution_pt() != *matrix_in.distribution_pt()))
3563  {
3564  std::ostringstream error_message;
3565  error_message << "The result_matrix is built. "
3566  << "But has a different distribution from matrix_in \n"
3567  << "They need to be the same.\n";
3568  throw OomphLibError(error_message.str(),
3569  OOMPH_CURRENT_FUNCTION,
3570  OOMPH_EXCEPTION_LOCATION);
3571  }
3572 #endif
3573 
3574  // To add the elements of two CRDoubleMatrices, we need to know the union of
3575  // the sparsity patterns. This is determined by the column indices.
3576  // We add the column indices and values (entries) as a key-value pair in
3577  // a map (per row). We then read these out into two column indices and values
3578  // vector for the result matrix.
3579 
3580  unsigned nrow_local = this->nrow_local();
3581  Vector<int> res_column_indices;
3582  Vector<double> res_values;
3583  Vector<int> res_row_start;
3584  res_row_start.reserve(nrow_local+1);
3585 
3586  // The row_start and column_indices
3587  const int* this_column_indices = this->column_index();
3588  const int* this_row_start = this->row_start();
3589  const int* in_column_indices = matrix_in.column_index();
3590  const int* in_row_start = matrix_in.row_start();
3591 
3592  // Values from this matrix and matrix_in.
3593  const double* this_values = this->value();
3594  const double* in_values = matrix_in.value();
3595 
3596 
3597  // The first entry in row_start is always zero.
3598  res_row_start.push_back(0);
3599 
3600  // Loop through the rows of both matrices and insert the column indices and
3601  // values as a key-value pair.
3602  for (unsigned row_i = 0; row_i < nrow_local; row_i++)
3603  {
3604  // Create the map for this row.
3605  std::map<int,double> res_row_map;
3606 
3607  // Insert the column and value pair for this matrix.
3608  for (int i = this_row_start[row_i]; i < this_row_start[row_i+1]; i++)
3609  {
3610  res_row_map[this_column_indices[i]] = this_values[i];
3611  }
3612 
3613  // Insert the column and value pair for in matrix.
3614  for (int i = in_row_start[row_i]; i < in_row_start[row_i+1]; i++)
3615  {
3616  res_row_map[in_column_indices[i]] += in_values[i];
3617  }
3618 
3619  // Fill in the row start
3620  res_row_start.push_back(res_row_start.back() + res_row_map.size());
3621 
3622  // Now insert the key into res_column_indices and value into res_values
3623  for(std::map<int,double>::iterator it = res_row_map.begin();
3624  it != res_row_map.end(); ++it)
3625  {
3626  res_column_indices.push_back(it->first);
3627  res_values.push_back(it->second);
3628  }
3629  }
3630 
3631  // Finally build the result_matrix.
3632  if(result_matrix.distribution_pt()->built())
3633  {
3634  // Use the existing distribution.
3635  result_matrix.build(this->ncol(),res_values,
3636  res_column_indices,res_row_start);
3637  }
3638  else
3639  {
3640  // Build with THIS distribution
3641  result_matrix.build(this->distribution_pt(),this->ncol(),res_values,
3642  res_column_indices,res_row_start);
3643  }
3644 }
3645 
3646 //=================================================================
3647 /// Namespace for helper functions for CRDoubleMatrices
3648 //=================================================================
3649 namespace CRDoubleMatrixHelpers
3650 {
3651  //============================================================================
3652  /// \short Builds a uniformly distributed matrix.
3653  /// A locally replicated matrix is constructed then redistributed using
3654  /// OOMPH-LIB's default uniform row distribution.
3655  /// This is memory intensive thus should be used for
3656  /// testing or small problems only.
3657  //============================================================================
3659  const unsigned &nrow, const unsigned &ncol,
3660  const OomphCommunicator* const comm_pt,
3661  const Vector<double> &values,
3662  const Vector<int> &column_indices, const Vector<int> &row_start,
3663  CRDoubleMatrix &matrix_out)
3664  {
3665 #ifdef PARANOID
3666  // Check if the communicator exists.
3667  if(comm_pt == 0)
3668  {
3669  std::ostringstream error_message;
3670  error_message << "Please supply the communicator.\n";
3671  throw OomphLibError(error_message.str(),
3672  OOMPH_CURRENT_FUNCTION,
3673  OOMPH_EXCEPTION_LOCATION);
3674  }
3675  // Is the out matrix built? We need an empty matrix!
3676  if(matrix_out.built())
3677  {
3678  std::ostringstream error_message;
3679  error_message << "The result matrix has been built.\n"
3680  << "Please clear the matrix.\n";
3681  throw OomphLibError(error_message.str(),
3682  OOMPH_CURRENT_FUNCTION,
3683  OOMPH_EXCEPTION_LOCATION);
3684  }
3685 #endif
3686 
3687  // Create the locally replicated distribution.
3688  bool distributed = false;
3690  locally_replicated_distribution(comm_pt,nrow,distributed);
3691 
3692  // Create the matrix.
3693  matrix_out.build(&locally_replicated_distribution,ncol,
3694  values,column_indices,row_start);
3695 
3696  // Create the distributed distribution.
3697  distributed = true;
3699  distributed_distribution(comm_pt,nrow,distributed);
3700 
3701  // Redistribute the matrix.
3702  matrix_out.redistribute(&distributed_distribution);
3703  }
3704 
3705  //============================================================================
3706  /// Compute infinity (maximum) norm of sub blocks as if it was one matrix
3707  //============================================================================
3708  double inf_norm(const DenseMatrix<CRDoubleMatrix*> &matrix_pt)
3709  {
3710 
3711  // The number of block rows and columns
3712  const unsigned nblockrow = matrix_pt.nrow();
3713  const unsigned nblockcol = matrix_pt.ncol();
3714 
3715 #ifdef PARANOID
3716  // Check that tehre is at least one matrix.
3717  if(matrix_pt.nrow() == 0)
3718  {
3719  std::ostringstream error_message;
3720  error_message << "There are no matrices... \n";
3721  throw OomphLibError(error_message.str(),
3722  OOMPH_CURRENT_FUNCTION,
3723  OOMPH_EXCEPTION_LOCATION);
3724  }
3725 
3726 
3727  // Check that all matrix_pt pointers are not null
3728  // and the matrices are built.
3729  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3730  {
3731  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3732  {
3733  if(matrix_pt(block_row_i,block_col_i) == 0)
3734  {
3735  std::ostringstream error_message;
3736  error_message << "The pointer martrix_pt(" << block_row_i
3737  << "," << block_col_i <<") is null.\n";
3738  throw OomphLibError(error_message.str(),
3739  OOMPH_CURRENT_FUNCTION,
3740  OOMPH_EXCEPTION_LOCATION);
3741  }
3742 
3743  if(!matrix_pt(block_row_i,block_col_i)->built())
3744  {
3745  std::ostringstream error_message;
3746  error_message << "The matrix at martrix_pt(" << block_row_i
3747  << "," << block_col_i <<") is not built.\n";
3748  throw OomphLibError(error_message.str(),
3749  OOMPH_CURRENT_FUNCTION,
3750  OOMPH_EXCEPTION_LOCATION);
3751  }
3752  }
3753  }
3754 #endif
3755 
3756 #ifdef OOMPH_HAS_MPI
3757 
3758  // The communicator pointer from block (0,0)
3759  const OomphCommunicator* const comm_pt
3760  = matrix_pt(0,0)->distribution_pt()->communicator_pt();
3761 
3762 #ifdef PARANOID
3763 
3764 
3765  // Check that all communicators are the same
3766  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3767  {
3768  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3769  {
3770  // Communicator for this block matrix.
3771  const OomphCommunicator current_block_comm
3772  = *(matrix_pt(block_row_i,block_col_i)
3773  ->distribution_pt()->communicator_pt());
3774  if(*comm_pt != current_block_comm)
3775  {
3776  std::ostringstream error_message;
3777  error_message << "The communicator of block martrix_pt("<< block_row_i
3778  << "," << block_col_i <<") is not the same as block "
3779  << "matrix_pt(0,0).\n";
3780  throw OomphLibError(error_message.str(),
3781  OOMPH_CURRENT_FUNCTION,
3782  OOMPH_EXCEPTION_LOCATION);
3783 
3784  }
3785  }
3786  }
3787 
3788  // Check that all distributed boolean are the same (if on more than 1 core)
3789  if(comm_pt->nproc() > 1)
3790  {
3791  // Get the distributed boolean from matrix_pt(0,0)
3792  bool first_distributed = matrix_pt(0,0)->distributed();
3793 
3794  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3795  {
3796  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3797  {
3798  // Is the current block distributed?
3799  bool current_distributed
3800  = matrix_pt(block_row_i,block_col_i)->distributed();
3801 
3802  if(first_distributed != current_distributed)
3803  {
3804  std::ostringstream error_message;
3805  error_message << "Block matrix_pt(" << block_row_i
3806  << ","<<block_col_i<<") and block matrix_pt(0,0) "
3807  << "have a different distributed boolean.\n";
3808  throw OomphLibError(error_message.str(),
3809  OOMPH_CURRENT_FUNCTION,
3810  OOMPH_EXCEPTION_LOCATION);
3811  }
3812  }
3813  }
3814  }
3815 
3816  // Check that all sub matrix dimensions "make sense"
3817  // We need to check that all the matrices in the same row has the same nrow.
3818  // Then repeat for the columns.
3819 
3820  // Check the nrow of each block row.
3821  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3822  {
3823  // Get the nrow to compare against from the first column.
3824  const unsigned first_block_nrow = matrix_pt(block_row_i,0)->nrow();
3825 
3826  // Loop through the block columns.
3827  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
3828  {
3829  // If the nrow of this block is not the same as the nrow from the first
3830  // block in this block row, throw an error.
3831  const unsigned current_block_nrow
3832  = matrix_pt(block_row_i,block_col_i)->nrow();
3833 
3834  if(first_block_nrow != current_block_nrow)
3835  {
3836  std::ostringstream error_message;
3837  error_message << "First block has nrow = " << current_block_nrow
3838  << ". But martrix_pt(" << block_row_i
3839  << "," << block_col_i <<") has nrow = "
3840  << current_block_nrow << ".\n";
3841  throw OomphLibError(error_message.str(),
3842  OOMPH_CURRENT_FUNCTION,
3843  OOMPH_EXCEPTION_LOCATION);
3844  }
3845 
3846  }
3847  }
3848 
3849  // Check the ncol of each block column.
3850  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3851  {
3852  // Get the ncol from the first block row to compare against.
3853  const unsigned first_block_ncol = matrix_pt(0,block_col_i)->ncol();
3854 
3855  for (unsigned block_row_i = 1; block_row_i < nblockrow; block_row_i++)
3856  {
3857  // Get the ncol for the current block.
3858  const unsigned current_block_ncol
3859  = matrix_pt(block_row_i,block_col_i)->ncol();
3860 
3861  if(first_block_ncol != current_block_ncol)
3862  {
3863  std::ostringstream error_message;
3864  error_message << "First block has ncol = " << current_block_ncol
3865  << ". But martrix_pt(" << block_row_i
3866  << "," << block_col_i <<") has ncol = "
3867  << current_block_ncol << ".\n";
3868  throw OomphLibError(error_message.str(),
3869  OOMPH_CURRENT_FUNCTION,
3870  OOMPH_EXCEPTION_LOCATION);
3871  }
3872  }
3873  }
3874 
3875  // Check that the distribution for each block row is the same.
3876  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3877  {
3878  // The first distribution of this block row.
3879  const LinearAlgebraDistribution first_dist
3880  = *(matrix_pt(block_row_i,0)->distribution_pt());
3881 
3882  // Loop through the rest of the block columns.
3883  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
3884  {
3885  // Get the distribution from the current block.
3886  const LinearAlgebraDistribution current_dist
3887  = matrix_pt(block_row_i,block_col_i)->distribution_pt();
3888 
3889  // Compare the first distribution against the current.
3890  if(first_dist != current_dist)
3891  {
3892  std::ostringstream error_message;
3893  error_message << "First distribution of block row " << block_row_i
3894  << " is different from the distribution from "
3895  << "martrix_pt(" << block_row_i
3896  << "," << block_col_i <<").\n";
3897  throw OomphLibError(error_message.str(),
3898  OOMPH_CURRENT_FUNCTION,
3899  OOMPH_EXCEPTION_LOCATION);
3900  }
3901  }
3902  }
3903 #endif
3904 
3905 #endif
3906 
3907  // Loop thrpugh the block rows, then block columns to
3908  // compute the local inf norm
3909  double inf_norm = 0;
3910  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3911  {
3912  // Get the number of local rows from the first block.
3913  unsigned block_nrow_local = matrix_pt(block_row_i,0)->nrow_local();
3914 
3915  // Loop through the block_nrow_local in this block row
3916  for (unsigned local_row_i = 0; local_row_i < block_nrow_local;
3917  local_row_i++)
3918  {
3919  double abs_sum_of_row = 0;
3920  // Loop through the block columns
3921  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3922  {
3923  // Locally cache the pointer to the current block.
3924  CRDoubleMatrix* block_pt = matrix_pt(block_row_i,block_col_i);
3925 
3926  const int* row_start = block_pt->row_start();
3927  const double* value = block_pt->value();
3928 
3929  // Loop through the values
3930  for (int val_i = row_start[local_row_i];
3931  val_i < row_start[local_row_i+1]; val_i++)
3932  {
3933  abs_sum_of_row += fabs(value[val_i]);
3934  }
3935  }
3936  // Store the max row
3937  inf_norm = std::max(inf_norm,abs_sum_of_row);
3938  }
3939  }
3940 
3941  // if this vector is distributed and on multiple processors then gather
3942 #ifdef OOMPH_HAS_MPI
3943  double inf_norm_local = inf_norm;
3944  if ( matrix_pt(0,0)->distributed() && comm_pt->nproc() > 1)
3945  {
3946  MPI_Allreduce(&inf_norm,&inf_norm_local,1,MPI_DOUBLE,MPI_MAX,
3947  comm_pt->mpi_comm());
3948  }
3949  inf_norm = inf_norm_local;
3950 #endif
3951 
3952  // and return
3953  return inf_norm;
3954  }
3955 
3956  //============================================================================
3957  /// \short Calculates the largest Gershgorin disc whilst preserving the sign.
3958  /// Let A be an n by n matrix, with entries aij. For \f$ i \in \{ 1,...,n \} \f$ let
3959  /// \f$ R_i = \sum_{i\neq j} |a_{ij}| \f$ be the sum of the absolute values of the
3960  /// non-diagonal entries in the i-th row. Let \f$ D(a_{ii},R_i) \f$ be the closed
3961  /// disc centered at \f$ a_{ii} \f$ with radius \f$ R_i \f$, such a disc is called a
3962  /// Gershgorin disc.
3963  ///
3964  /// \n
3965  ///
3966  /// We calculate \f$ |D(a_{ii},R_i)|_{max} \f$ and multiply by the sign of the diagonal
3967  /// entry.
3968  ///
3969  /// \n
3970  ///
3971  /// The DenseMatrix of CRDoubleMatrices are treated as if they are one
3972  /// large matrix. Therefore the dimensions of the sub matrices has to
3973  /// "make sense", there is a paranoid check for this.
3974  //============================================================================
3976  const DenseMatrix<CRDoubleMatrix*> &matrix_pt)
3977  {
3978 
3979  // The number of block rows and columns
3980  const unsigned nblockrow = matrix_pt.nrow();
3981  const unsigned nblockcol = matrix_pt.ncol();
3982 
3983 #ifdef PARANOID
3984  // Check that tehre is at least one matrix.
3985  if(matrix_pt.nrow() == 0)
3986  {
3987  std::ostringstream error_message;
3988  error_message << "There are no matrices... \n";
3989  throw OomphLibError(error_message.str(),
3990  OOMPH_CURRENT_FUNCTION,
3991  OOMPH_EXCEPTION_LOCATION);
3992  }
3993 
3994 
3995  // Check that all matrix_pt pointers are not null
3996  // and the matrices are built.
3997  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3998  {
3999  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4000  {
4001  if(matrix_pt(block_row_i,block_col_i) == 0)
4002  {
4003  std::ostringstream error_message;
4004  error_message << "The pointer martrix_pt(" << block_row_i
4005  << "," << block_col_i <<") is null.\n";
4006  throw OomphLibError(error_message.str(),
4007  OOMPH_CURRENT_FUNCTION,
4008  OOMPH_EXCEPTION_LOCATION);
4009  }
4010 
4011  if(!matrix_pt(block_row_i,block_col_i)->built())
4012  {
4013  std::ostringstream error_message;
4014  error_message << "The matrix at martrix_pt(" << block_row_i
4015  << "," << block_col_i <<") is not built.\n";
4016  throw OomphLibError(error_message.str(),
4017  OOMPH_CURRENT_FUNCTION,
4018  OOMPH_EXCEPTION_LOCATION);
4019  }
4020  }
4021  }
4022 #endif
4023 
4024 
4025 #ifdef OOMPH_HAS_MPI
4026 
4027  // The communicator pointer from block (0,0)
4028  // All communicators should be the same, we check this next.
4029  const OomphCommunicator* const comm_pt
4030  = matrix_pt(0,0)->distribution_pt()->communicator_pt();
4031 
4032 #ifdef PARANOID
4033 
4034  // Check that all communicators are the same
4035  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4036  {
4037  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4038  {
4039  // Communicator for this block matrix.
4040  const OomphCommunicator current_block_comm
4041  = *(matrix_pt(block_row_i,block_col_i)
4042  ->distribution_pt()->communicator_pt());
4043  if(*comm_pt != current_block_comm)
4044  {
4045  std::ostringstream error_message;
4046  error_message << "The communicator of block martrix_pt("<< block_row_i
4047  << "," << block_col_i <<") is not the same as block "
4048  << "matrix_pt(0,0).\n";
4049  throw OomphLibError(error_message.str(),
4050  OOMPH_CURRENT_FUNCTION,
4051  OOMPH_EXCEPTION_LOCATION);
4052 
4053  }
4054  }
4055  }
4056 
4057  // Check that all distributed boolean are the same (if on more than 1 core)
4058  if(comm_pt->nproc() > 1)
4059  {
4060  // Get the distributed boolean from matrix_pt(0,0)
4061  bool first_distributed = matrix_pt(0,0)->distributed();
4062 
4063  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4064  {
4065  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4066  {
4067  // Is the current block distributed?
4068  bool current_distributed
4069  = matrix_pt(block_row_i,block_col_i)->distributed();
4070 
4071  if(first_distributed != current_distributed)
4072  {
4073  std::ostringstream error_message;
4074  error_message << "Block matrix_pt(" << block_row_i
4075  << ","<<block_col_i<<") and block matrix_pt(0,0) "
4076  << "have a different distributed boolean.\n";
4077  throw OomphLibError(error_message.str(),
4078  OOMPH_CURRENT_FUNCTION,
4079  OOMPH_EXCEPTION_LOCATION);
4080  }
4081  }
4082  }
4083  }
4084 
4085  // Check that all sub matrix dimensions "make sense"
4086  // We need to check that all the matrices in the same row has the same nrow.
4087  // Then repeat for the columns.
4088 
4089  // Check the nrow of each block row.
4090  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4091  {
4092  // Get the nrow to compare against from the first column.
4093  const unsigned first_block_nrow = matrix_pt(block_row_i,0)->nrow();
4094 
4095  // Loop through the block columns.
4096  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
4097  {
4098  // If the nrow of this block is not the same as the nrow from the first
4099  // block in this block row, throw an error.
4100  const unsigned current_block_nrow
4101  = matrix_pt(block_row_i,block_col_i)->nrow();
4102 
4103  if(first_block_nrow != current_block_nrow)
4104  {
4105  std::ostringstream error_message;
4106  error_message << "First block has nrow = " << current_block_nrow
4107  << ". But martrix_pt(" << block_row_i
4108  << "," << block_col_i <<") has nrow = "
4109  << current_block_nrow << ".\n";
4110  throw OomphLibError(error_message.str(),
4111  OOMPH_CURRENT_FUNCTION,
4112  OOMPH_EXCEPTION_LOCATION);
4113  }
4114 
4115  }
4116  }
4117 
4118  // Check the ncol of each block column.
4119  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4120  {
4121  // Get the ncol from the first block row to compare against.
4122  const unsigned first_block_ncol = matrix_pt(0,block_col_i)->ncol();
4123 
4124  for (unsigned block_row_i = 1; block_row_i < nblockrow; block_row_i++)
4125  {
4126  // Get the ncol for the current block.
4127  const unsigned current_block_ncol
4128  = matrix_pt(block_row_i,block_col_i)->ncol();
4129 
4130  if(first_block_ncol != current_block_ncol)
4131  {
4132  std::ostringstream error_message;
4133  error_message << "First block has ncol = " << current_block_ncol
4134  << ". But martrix_pt(" << block_row_i
4135  << "," << block_col_i <<") has ncol = "
4136  << current_block_ncol << ".\n";
4137  throw OomphLibError(error_message.str(),
4138  OOMPH_CURRENT_FUNCTION,
4139  OOMPH_EXCEPTION_LOCATION);
4140  }
4141  }
4142  }
4143 
4144  // Check that the distribution for each block row is the same.
4145  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4146  {
4147  // The first distribution of this block row.
4148  const LinearAlgebraDistribution first_dist
4149  = *(matrix_pt(block_row_i,0)->distribution_pt());
4150 
4151  // Loop through the rest of the block columns.
4152  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
4153  {
4154  // Get the distribution from the current block.
4155  const LinearAlgebraDistribution current_dist
4156  = matrix_pt(block_row_i,block_col_i)->distribution_pt();
4157 
4158  // Compare the first distribution against the current.
4159  if(first_dist != current_dist)
4160  {
4161  std::ostringstream error_message;
4162  error_message << "First distribution of block row " << block_row_i
4163  << " is different from the distribution from "
4164  << "martrix_pt(" << block_row_i
4165  << "," << block_col_i <<").\n";
4166  throw OomphLibError(error_message.str(),
4167  OOMPH_CURRENT_FUNCTION,
4168  OOMPH_EXCEPTION_LOCATION);
4169  }
4170  }
4171  }
4172 
4173 #endif
4174 #endif
4175 
4176  // Loop thrpugh the block rows, then block columns to
4177  // compute the local inf norm
4178  double extreme_disc = 0;
4179  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4180  {
4181  // Get the number of local rows from the first block.
4182  unsigned block_nrow_local = matrix_pt(block_row_i,0)->nrow_local();
4183 
4184  // Loop through the block_nrow_local in this block row
4185  for (unsigned local_row_i = 0; local_row_i < block_nrow_local;
4186  local_row_i++)
4187  {
4188  double abs_sum_of_row = 0;
4189  // Loop through the block columns
4190  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4191  {
4192  // Locally cache the pointer to the current block.
4193  CRDoubleMatrix* block_pt = matrix_pt(block_row_i,block_col_i);
4194 
4195  const int* row_start = block_pt->row_start();
4196  const double* value = block_pt->value();
4197 
4198  // Loop through the values
4199  for (int val_i = row_start[local_row_i];
4200  val_i < row_start[local_row_i+1]; val_i++)
4201  {
4202  abs_sum_of_row += fabs(value[val_i]);
4203  }
4204  }
4205 
4206  // Now minus the diagonal entry...
4207  // Locate the diagonal block matrix.
4208  double* s_values = matrix_pt(block_row_i,block_row_i)->value();
4209  int* s_column_index = matrix_pt(block_row_i,block_row_i)->column_index();
4210  int* s_row_start = matrix_pt(block_row_i,block_row_i)->row_start();
4211  //int s_nrow_local = matrix_pt(block_row_i,block_row_i)->nrow_local();
4212  int s_first_row = matrix_pt(block_row_i,block_row_i)->first_row();
4213 
4214  // Get the diagonal value...
4215  double diagonal_value = 0;
4216  bool found = false;
4217  for (int j = s_row_start[local_row_i];
4218  j < s_row_start[local_row_i+1] && !found; j++)
4219  {
4220  if (s_column_index[j] == int(local_row_i + s_first_row))
4221  {
4222  diagonal_value = s_values[j];
4223  found = true;
4224  }
4225  }
4226 
4227  // Check if the diagonal entry is found.
4228  if(!found)
4229  {
4230  std::ostringstream error_message;
4231  error_message << "The diagonal entry for the block("
4232  << block_row_i<<","<<block_row_i<<")\n"
4233  << "on local row " << local_row_i << " does not exist."
4234  << std::endl;
4235  throw OomphLibError(error_message.str(),
4236  OOMPH_CURRENT_FUNCTION,
4237  OOMPH_EXCEPTION_LOCATION);
4238  }
4239 
4240  // This is the disc.
4241  abs_sum_of_row -= fabs(diagonal_value);
4242 
4243  // Now we have to check if the diagonal entry is
4244  // on the left or right side of zero.
4245  if(diagonal_value > 0)
4246  {
4247  double extreme_disc_local = diagonal_value + abs_sum_of_row;
4248  extreme_disc = std::max(extreme_disc_local,extreme_disc);
4249  }
4250  else
4251  {
4252  double extreme_disc_local = diagonal_value - abs_sum_of_row;
4253  extreme_disc = std::min(extreme_disc_local,extreme_disc);
4254  }
4255  } // Loop through local row (of all block column)
4256  } // Loop through block row
4257 
4258  // if this vector is distributed and on multiple processors then gather
4259 #ifdef OOMPH_HAS_MPI
4260  double extreme_disc_local = extreme_disc;
4261  if ( matrix_pt(0,0)->distributed() && comm_pt->nproc() > 1)
4262  {
4263  if(extreme_disc > 0)
4264  {
4265  MPI_Allreduce(&extreme_disc,&extreme_disc_local,1,MPI_DOUBLE,MPI_MAX,
4266  comm_pt->mpi_comm());
4267  }
4268  else
4269  {
4270  MPI_Allreduce(&extreme_disc,&extreme_disc_local,1,MPI_DOUBLE,MPI_MIN,
4271  comm_pt->mpi_comm());
4272  }
4273  }
4274  extreme_disc = extreme_disc_local;
4275 #endif
4276 
4277  // and return
4278  return extreme_disc;
4279  }
4280 
4281  //============================================================================
4282  /// \short Concatenate CRDoubleMatrix matrices.
4283  /// The in matrices are concatenated such that the block structure of the
4284  /// in matrices are preserved in the result matrix. Communication between
4285  /// processors is required. If the block structure of the sub matrices does
4286  /// not need to be preserved, consider using
4287  /// CRDoubleMatrixHelpers::concatenate_without_communication(...).
4288  ///
4289  /// The matrix manipulation functions
4290  /// CRDoubleMatrixHelpers::concatenate(...) and
4291  /// CRDoubleMatrixHelpers::concatenate_without_communication(...)
4292  /// are analogous to the Vector manipulation functions
4293  /// DoubleVectorHelpers::concatenate(...) and
4294  /// DoubleVectorHelpers::concatenate_without_communication(...).
4295  /// Please look at the DoubleVector functions for an illustration of the
4296  /// differences between concatenate(...) and
4297  /// concatenate_without_communication(...).
4298  ///
4299  /// Distribution of the result matrix:
4300  /// If the result matrix does not have a distribution built, then it will be
4301  /// given a uniform row distribution. Otherwise we use the existing
4302  /// distribution. This gives the user the ability to define their own
4303  /// distribution, or save computing power if a distribution has
4304  /// been pre-built.
4305  ///
4306  /// NOTE: ALL the matrices pointed to by matrix_pt has to be built. This is
4307  /// not the case with concatenate_without_communication(...)
4308  //============================================================================
4310  CRDoubleMatrix &result_matrix)
4311  {
4312 
4313  // The number of block rows and block columns.
4314  unsigned matrix_nrow = matrix_pt.nrow();
4315  unsigned matrix_ncol = matrix_pt.ncol();
4316 
4317  // PARANOID checks involving only the in matrices.
4318 #ifdef PARANOID
4319  // Are there matrices to concatenate?
4320  if(matrix_nrow == 0)
4321  {
4322  std::ostringstream error_message;
4323  error_message << "There are no matrices to concatenate.\n";
4324  throw OomphLibError(error_message.str(),
4325  OOMPH_CURRENT_FUNCTION,
4326  OOMPH_EXCEPTION_LOCATION);
4327  }
4328 
4329  // Does this matrix need concatenating?
4330  if((matrix_nrow == 1) && (matrix_ncol == 1))
4331  {
4332  std::ostringstream warning_message;
4333  warning_message << "There is only one matrix to concatenate...\n"
4334  << "This does not require concatenating...\n";
4335  OomphLibWarning(warning_message.str(),
4336  OOMPH_CURRENT_FUNCTION,
4337  OOMPH_EXCEPTION_LOCATION);
4338  }
4339 
4340  // Are all sub matrices built?
4341  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4342  {
4343  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4344  {
4345  if (!(matrix_pt(block_row_i,block_col_i)->built()))
4346  {
4347  std::ostringstream error_message;
4348  error_message
4349  << "The sub matrix (" << block_row_i << "," << block_col_i << ")\n"
4350  << "is not built. \n";
4351  throw OomphLibError(error_message.str(),
4352  OOMPH_CURRENT_FUNCTION,
4353  OOMPH_EXCEPTION_LOCATION);
4354  }
4355  }
4356  }
4357 
4358  // Do all dimensions of sub matrices "make sense"?
4359  // Compare the number of rows of each block matrix in a block row.
4360  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4361  {
4362  // Use the first column to compare against the rest.
4363  unsigned long current_block_nrow = matrix_pt(block_row_i,0)->nrow();
4364 
4365  // Compare against columns 1 to matrix_ncol - 1
4366  for(unsigned block_col_i = 1; block_col_i < matrix_ncol; block_col_i++)
4367  {
4368  // Get the nrow for this sub block.
4369  unsigned long subblock_nrow
4370  = matrix_pt(block_row_i,block_col_i)->nrow();
4371 
4372  if(current_block_nrow != subblock_nrow)
4373  {
4374  std::ostringstream error_message;
4375  error_message
4376  << "The sub matrix (" << block_row_i << "," << block_col_i << ")\n"
4377  << "requires nrow = " << current_block_nrow
4378  << ", but has nrow = " << subblock_nrow <<".\n";
4379  throw OomphLibError(error_message.str(),
4380  OOMPH_CURRENT_FUNCTION,
4381  OOMPH_EXCEPTION_LOCATION);
4382  }
4383  }
4384  }
4385 
4386  // Compare the number of columns of each block matrix in a block column.
4387  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4388  {
4389  // Use the first row to compare against the rest.
4390  unsigned long current_block_ncol = matrix_pt(0,block_col_i)->ncol();
4391 
4392  // Compare against rows 1 to matrix_nrow - 1
4393  for(unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
4394  {
4395  // Get the ncol for this sub block.
4396  unsigned long subblock_ncol
4397  = matrix_pt(block_row_i,block_col_i)->ncol();
4398 
4399  if(current_block_ncol != subblock_ncol)
4400  {
4401  std::ostringstream error_message;
4402  error_message
4403  << "The sub matrix (" << block_row_i << "," << block_col_i << ")\n"
4404  << "requires ncol = " << current_block_ncol
4405  << ", but has ncol = " << subblock_ncol << ".\n";
4406  throw OomphLibError(error_message.str(),
4407  OOMPH_CURRENT_FUNCTION,
4408  OOMPH_EXCEPTION_LOCATION);
4409  }
4410  }
4411  }
4412 #endif
4413 
4414  // The communicator pointer from block (0,0)
4415  const OomphCommunicator* const comm_pt
4416  = matrix_pt(0,0)->distribution_pt()->communicator_pt();
4417 
4418  // Check if the block (0,0) is distributed or not.
4419  bool distributed = matrix_pt(0,0)->distributed();
4420 
4421  // If the result matrix does not have a distribution, we create a uniform
4422  // distribution.
4423  if(!result_matrix.distribution_pt()->built())
4424  {
4425  // Sum of sub matrix nrow. We use the first column.
4426  unsigned tmp_nrow = 0;
4427  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4428  {
4429  tmp_nrow +=matrix_pt(block_row_i,0)->nrow();
4430  }
4431 
4432  LinearAlgebraDistribution tmp_distribution(comm_pt,tmp_nrow,distributed);
4433 
4434  result_matrix.build(&tmp_distribution);
4435  }
4436  else
4437  // A distribution is supplied for the result matrix.
4438  {
4439 #ifdef PARANOID
4440  // Check if the sum of the nrow from the sub matrices is the same as the
4441  // the nrow from the result matrix.
4442 
4443  // Sum of sub matrix nrow. We use the first column.
4444  unsigned tmp_nrow = 0;
4445  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4446  {
4447  tmp_nrow +=matrix_pt(block_row_i,0)->nrow();
4448  }
4449 
4450  if(tmp_nrow != result_matrix.nrow())
4451  {
4452  std::ostringstream error_message;
4453  error_message << "The total number of rows from the matrices to\n"
4454  << "concatenate does not match the nrow from the\n"
4455  << "result matrix\n";
4456  throw OomphLibError(error_message.str(),
4457  OOMPH_CURRENT_FUNCTION,
4458  OOMPH_EXCEPTION_LOCATION);
4459  }
4460 #endif
4461  }
4462 
4463 #ifdef PARANOID
4464 
4465  // Are all the communicators the same?
4466  // Compare the communicator for sub matrices (against the result matrix).
4467  {
4468  const OomphCommunicator communicator
4469  = *(result_matrix.distribution_pt()->communicator_pt());
4470 
4471  // Are all communicator pointers the same?
4472  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4473  {
4474  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4475  {
4476  const OomphCommunicator another_communicator
4477  = *(matrix_pt(block_row_i,block_col_i)
4478  ->distribution_pt()->communicator_pt());
4479 
4480  if(!(communicator == another_communicator))
4481  {
4482  std::ostringstream error_message;
4483  error_message
4484  << "The OomphCommunicator of the sub matrix ("
4485  << block_row_i << "," << block_col_i << ")\n"
4486  << "does not have the same communicator as the result matrix. \n";
4487  throw OomphLibError(error_message.str(),
4488  OOMPH_CURRENT_FUNCTION,
4489  OOMPH_EXCEPTION_LOCATION);
4490  }
4491  }
4492  }
4493  }
4494 
4495  // Are all the distributed boolean the same? This only applies if we have
4496  // more than one processor. If there is only one processor, then it does
4497  // not matter if it is distributed or not - they are conceptually the same.
4498  if(comm_pt->nproc() != 1)
4499  {
4500  // Compare distributed for sub matrices (against the result matrix).
4501  const bool res_distributed = result_matrix.distributed();
4502 
4503  // Loop over all sub blocks.
4504  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4505  {
4506  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4507  {
4508  const bool another_distributed
4509  = matrix_pt(block_row_i,block_col_i)->distributed();
4510 
4511  if(res_distributed != another_distributed)
4512  {
4513  std::ostringstream error_message;
4514  error_message
4515  << "The distributed boolean of the sub matrix ("
4516  << block_row_i << "," << block_col_i << ")\n"
4517  << "is not the same as the result matrix. \n";
4518  throw OomphLibError(error_message.str(),
4519  OOMPH_CURRENT_FUNCTION,
4520  OOMPH_EXCEPTION_LOCATION);
4521  }
4522  }
4523  }
4524  }
4525 #endif
4526 
4527 
4528  // Get the number of columns up to each block for offset
4529  // in calculating the result column indices.
4530  // Since the number of columns in each block column is the same,
4531  // we only loop through the first block row (row zero).
4532  Vector<unsigned long> sum_of_ncol_up_to_block(matrix_ncol);
4533 
4534  // Also compute the total number of columns to build the resulting matrix.
4535  unsigned long res_ncol = 0;
4536 
4537  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4538  {
4539  sum_of_ncol_up_to_block[block_col_i] = res_ncol;
4540  res_ncol += matrix_pt(0,block_col_i)->ncol();
4541  }
4542 
4543  // We begin the process of extracting and ordering the values,
4544  // column_indices and row_start of all the sub blocks.
4545  if((comm_pt->nproc() == 1) || !distributed)
4546  // Serial version of the code.
4547  {
4548  // Get the total number of non zero entries so we can reserve storage for
4549  // the values and column_indices vectors.
4550  unsigned long res_nnz = 0;
4551  for (unsigned block_row_i = 0;
4552  block_row_i < matrix_nrow; block_row_i++)
4553  {
4554  for (unsigned block_col_i = 0;
4555  block_col_i < matrix_ncol; block_col_i++)
4556  {
4557  res_nnz+=matrix_pt(block_row_i,block_col_i)->nnz();
4558  }
4559  }
4560 
4561  // Declare the vectors required to build a CRDoubleMatrix
4562  Vector<double> res_values;
4563  Vector<int> res_column_indices;
4564  Vector<int> res_row_start;
4565 
4566  // Reserve space for the vectors.
4567  res_values.reserve(res_nnz);
4568  res_column_indices.reserve(res_nnz);
4569  res_row_start.reserve(result_matrix.nrow()+1);
4570 
4571  // Now we fill in the data.
4572 
4573  // Running sum of nnz per row.
4574  int nnz_running_sum = 0;
4575 
4576  // Loop through the block rows.
4577  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4578  {
4579  // Get the number of rows in this block row, from the first block.
4580  unsigned long block_row_nrow = matrix_pt(block_row_i,0)->nrow();
4581 
4582  // Loop through the number of rows in this block row
4583  for (unsigned row_i = 0; row_i < block_row_nrow; row_i++)
4584  {
4585  // The row start is the nnz at the start of the row.
4586  res_row_start.push_back(nnz_running_sum);
4587 
4588  // Loop through the block columns
4589  for (unsigned block_col_i = 0;
4590  block_col_i < matrix_ncol; block_col_i++)
4591  {
4592  // Get the current block.
4593  CRDoubleMatrix* current_block_pt
4594  = matrix_pt(block_row_i,block_col_i);
4595 
4596  // Get the values, column_indices and row_start for this block.
4597  double* current_block_values = current_block_pt->value();
4598  int* current_block_column_indices
4599  = current_block_pt->column_index();
4600  int* current_block_row_start = current_block_pt->row_start();
4601 
4602  for (int val_i = current_block_row_start[row_i];
4603  val_i < current_block_row_start[row_i+1]; val_i++)
4604  {
4605  res_values.push_back(current_block_values[val_i]);
4606  res_column_indices.push_back(current_block_column_indices[val_i]
4607  + sum_of_ncol_up_to_block[block_col_i]);
4608  }
4609 
4610  // Update the running sum of nnz per row
4611  nnz_running_sum += current_block_row_start[row_i+1]
4612  - current_block_row_start[row_i];
4613  } // for block cols
4614  } // for rows
4615  } // for block rows
4616 
4617  // Fill in the last row start
4618  res_row_start.push_back(res_nnz);
4619 
4620  // Build the matrix
4621  result_matrix.build(res_ncol,res_values,res_column_indices,res_row_start);
4622  }
4623  // Otherwise we are dealing with a distributed matrix.
4624  else
4625  {
4626 #ifdef OOMPH_HAS_MPI
4627 
4628  // Flag to enable timing. This is for debugging
4629  // and/or testing purposes only.
4630  bool enable_timing = false;
4631 
4632  // Get the number of processors
4633  unsigned nproc = comm_pt->nproc();
4634 
4635  // My rank
4636  unsigned my_rank = comm_pt->my_rank();
4637 
4638  // Storage for the data (per processor) to send.
4639  Vector<Vector<unsigned> > column_indices_to_send(nproc);
4640  Vector<Vector<double> > values_to_send(nproc);
4641 
4642  // The sum of the nrow for the sub blocks (so far). This is used as an
4643  // offset to calculate the global equation number in the result matrix.
4644  unsigned long sum_of_block_nrow = 0;
4645 
4646  double t_prep_data_start;
4647  if(enable_timing)
4648  {
4649  t_prep_data_start = TimingHelpers::timer();
4650  }
4651 
4652  // Get the pointer to the result distribution, for convenience...
4653  LinearAlgebraDistribution* res_distribution_pt
4654  = result_matrix.distribution_pt();
4655 
4656  // loop over the sub blocks to calculate the global_eqn, get the values
4657  // and column indices.
4658  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4659  {
4660  // Get the number of local rows in this block_row from the first block.
4661  unsigned current_block_nrow_local = matrix_pt(block_row_i,0)
4662  ->nrow_local();
4663 
4664  // Get the first_row for this block_row
4665  unsigned current_block_row_first_row = matrix_pt(block_row_i,0)
4666  ->first_row();
4667 
4668  // Loop through the number of local rows
4669  for(unsigned sub_local_eqn = 0;
4670  sub_local_eqn < current_block_nrow_local; sub_local_eqn++)
4671  {
4672  // Calculate the corresponding (res_global_eqn) equation number
4673  // for this local row number in this block.
4674  unsigned long res_global_eqn = sub_local_eqn
4675  + current_block_row_first_row + sum_of_block_nrow;
4676 
4677  // Get the processor that this global row belongs to.
4678  // The rank_of_global_row(...) function loops through all the
4679  // processors and does two unsigned comparisons. Since we have to do
4680  // this for every row, it may be better to store a list mapping for
4681  // very large number of processors.
4682  unsigned res_p = res_distribution_pt
4683  ->rank_of_global_row(res_global_eqn);
4684 
4685  // With the res_p, we get the res_first_row to
4686  // work out the res_local_eqn
4687  unsigned res_first_row = res_distribution_pt->first_row(res_p);
4688  unsigned res_local_eqn = res_global_eqn - res_first_row;
4689 
4690  // Loop through the block columns, calculate the nnz. This is used to
4691  // reserve space for the value and column_indices Vectors.
4692  unsigned long current_row_nnz = 0;
4693  for (unsigned block_col_i = 0;
4694  block_col_i < matrix_ncol; block_col_i++)
4695  {
4696  // Get the row_start
4697  int* current_block_row_start = matrix_pt(block_row_i,block_col_i)
4698  ->row_start();
4699 
4700  // Update the nnz for this row.
4701  current_row_nnz += current_block_row_start[sub_local_eqn+1]
4702  - current_block_row_start[sub_local_eqn];
4703  } // for block column, get nnz.
4704 
4705  // Reserve space for efficiency.
4706  //unsigned capacity_in_res_p_vec
4707  // = column_indices_to_send[res_p].capacity();
4708 
4709  // Reserve memory for nnz+2, since we need to store the res_local_eqn
4710  // and nnz as well as the data (values/column indices).
4711  // Note: The two reserve functions are called per row.
4712  // If the matrix is very sparse (just a few elements per row), it
4713  // will be more efficient to not reserve and let the STL vector
4714  // handle this. On average, this implementation is more efficient.
4715  //column_indices_to_send[res_p].reserve(capacity_in_res_p_vec
4716  // + current_row_nnz+2);
4717  //values_to_send[res_p].reserve(capacity_in_res_p_vec
4718  // + current_row_nnz+2);
4719 
4720  // Push back the res_local_eqn and nnz
4721  column_indices_to_send[res_p].push_back(res_local_eqn);
4722  column_indices_to_send[res_p].push_back(current_row_nnz);
4723  values_to_send[res_p].push_back(res_local_eqn);
4724  values_to_send[res_p].push_back(current_row_nnz);
4725 
4726  // Loop through the block columns again and get the values
4727  // and column_indices
4728  for (unsigned block_col_i = 0;
4729  block_col_i < matrix_ncol; block_col_i++)
4730  {
4731  // Cache the pointer to the current block for convenience.
4732  CRDoubleMatrix* current_block_pt
4733  = matrix_pt(block_row_i,block_col_i);
4734 
4735  // Values, column indices and row_start for the current block.
4736  double* current_block_values = current_block_pt->value();
4737  int* current_block_column_indices
4738  = current_block_pt->column_index();
4739  int* current_block_row_start = current_block_pt->row_start();
4740 
4741  // Loop though the values and column_indices
4742  for (int val_i = current_block_row_start[sub_local_eqn];
4743  val_i < current_block_row_start[sub_local_eqn+1]; val_i++)
4744  {
4745  // Push back the value.
4746  values_to_send[res_p].push_back(current_block_values[val_i]);
4747 
4748  // Push back the (offset) column index.
4749  column_indices_to_send[res_p].push_back(
4750  current_block_column_indices[val_i]
4751  + sum_of_ncol_up_to_block[block_col_i]);
4752  } // for block columns
4753  } // for block column, get values and column_indices.
4754  } // for sub_local_eqn
4755 
4756  // update the sum_of_block_nrow
4757  sum_of_block_nrow += matrix_pt(block_row_i,0)->nrow();
4758 
4759  } // for block_row
4760 
4761  if(enable_timing)
4762  {
4763  double t_prep_data_finish = TimingHelpers::timer();
4764  double t_prep_data_time = t_prep_data_finish - t_prep_data_start;
4765  oomph_info << "Time for prep data: " << t_prep_data_time << std::endl;
4766  }
4767 
4768  // Prepare to send data!
4769 
4770  // Storage for the number of data to be sent to each processor.
4771  Vector<int> send_n(nproc,0);
4772 
4773  // Storage for all the values/column indices to be sent
4774  // to each processor.
4775  Vector<double> send_values_data;
4776  Vector<unsigned> send_column_indices_data;
4777 
4778  // Storage location within send_values_data
4779  // (and send_column_indices_data) for data to be sent to each processor.
4780  Vector<int> send_displacement(nproc,0);
4781 
4782  double t_total_ndata_start;
4783  if(enable_timing)
4784  t_total_ndata_start = TimingHelpers::timer();
4785 
4786  // Get the total amount of data which needs to be sent, so we can reserve
4787  // space for it.
4788  unsigned total_ndata = 0;
4789  for (unsigned rank = 0; rank < nproc; rank++)
4790  {
4791  if(rank != my_rank)
4792  {
4793  total_ndata += values_to_send[rank].size();
4794  }
4795  }
4796 
4797  if(enable_timing)
4798  {
4799  double t_total_ndata_finish = TimingHelpers::timer();
4800  double t_total_ndata_time= t_total_ndata_finish - t_total_ndata_start;
4801  oomph_info << "Time for total_ndata: " << t_total_ndata_time << std::endl;
4802  }
4803 
4804  double t_flat_pack_start;
4805  if(enable_timing)
4806  t_flat_pack_start = TimingHelpers::timer();
4807 
4808  // Now we don't have to re-allocate data/memory when push_back is called.
4809  // Nb. Using push_back without reserving memory may cause multiple
4810  // re-allocation behind the scenes, this is expensive.
4811  send_values_data.reserve(total_ndata);
4812  send_column_indices_data.reserve(total_ndata);
4813 
4814  // Loop over all the processors to "flat pack" the data for sending.
4815  for (unsigned rank = 0; rank < nproc; rank++)
4816  {
4817  // Set the offset for the current processor
4818  // This only has to be done once for both values and column indices.
4819  send_displacement[rank] = send_values_data.size();
4820 
4821  // Don't bother to do anything if
4822  // the processor in the loop is the current processor.
4823  if(rank != my_rank)
4824  {
4825  // Put the values into the send data vector.
4826  // n_data is the same for both values and column indices.
4827  unsigned n_data = values_to_send[rank].size();
4828  for (unsigned j = 0; j < n_data; j++)
4829  {
4830  send_values_data.push_back(values_to_send[rank][j]);
4831  send_column_indices_data.push_back(
4832  column_indices_to_send[rank][j]);
4833  } // for
4834  } // if rank != my_rank
4835 
4836  // Find the number of data to be added to the vector.
4837  // send_n is the same for both values and column indices.
4838  send_n[rank] = send_values_data.size() - send_displacement[rank];
4839  } // loop over processors
4840 
4841  if(enable_timing)
4842  {
4843  double t_flat_pack_finish = TimingHelpers::timer();
4844  double t_flat_pack_time = t_flat_pack_finish - t_flat_pack_start;
4845  oomph_info << "t_flat_pack_time: " << t_flat_pack_time << std::endl;
4846  }
4847 
4848  double t_sendn_start;
4849  if(enable_timing)
4850  t_sendn_start = TimingHelpers::timer();
4851 
4852  // Strorage for the number of data to be received from each processor
4853  Vector<int> receive_n(nproc,0);
4854 
4855  MPI_Alltoall(&send_n[0],1,MPI_INT,&receive_n[0],1,MPI_INT,
4856  comm_pt->mpi_comm());
4857 
4858  if(enable_timing)
4859  {
4860  double t_sendn_finish = TimingHelpers::timer();
4861  double t_sendn_time = t_sendn_finish - t_sendn_start;
4862  oomph_info << "t_sendn_time: " << t_sendn_time << std::endl;
4863  }
4864 
4865 
4866  // Prepare the data to be received
4867  // by working out the displacement from the received data
4868  // receive_displacement is the same for both values and column indices.
4869  Vector<int> receive_displacement(nproc,0);
4870  int receive_data_count = 0;
4871  for (unsigned rank = 0; rank < nproc; rank++)
4872  {
4873  receive_displacement[rank] = receive_data_count;
4874  receive_data_count += receive_n[rank];
4875  }
4876 
4877  // Now resize the receive buffer for all data from all processors.
4878  // Make sure that it has a size of at least one.
4879  if(receive_data_count == 0){receive_data_count++;}
4880  Vector<double> receive_values_data(receive_data_count);
4881  Vector<unsigned> receive_column_indices_data(receive_data_count);
4882 
4883  // Make sure that the send buffer has size at least one
4884  // so that we don't get a segmentation fault.
4885  if(send_values_data.size()==0){send_values_data.resize(1);}
4886 
4887  double t_send_data_start;
4888  if(enable_timing)
4889  t_send_data_start = TimingHelpers::timer();
4890 
4891  // Now send the data between all processors
4892  MPI_Alltoallv(&send_values_data[0],&send_n[0],&send_displacement[0],
4893  MPI_DOUBLE,
4894  &receive_values_data[0],&receive_n[0],
4895  &receive_displacement[0],
4896  MPI_DOUBLE,
4897  comm_pt->mpi_comm());
4898 
4899  // Now send the data between all processors
4900  MPI_Alltoallv(&send_column_indices_data[0],&send_n[0],
4901  &send_displacement[0],
4902  MPI_UNSIGNED,
4903  &receive_column_indices_data[0],&receive_n[0],
4904  &receive_displacement[0],
4905  MPI_UNSIGNED,
4906  comm_pt->mpi_comm());
4907 
4908  if(enable_timing)
4909  {
4910  double t_send_data_finish = TimingHelpers::timer();
4911  double t_send_data_time = t_send_data_finish - t_send_data_start;
4912  oomph_info << "t_send_data_time: " << t_send_data_time << std::endl;
4913  }
4914 
4915  // All the rows for this processor are stored in:
4916  // from other processors:
4917  // receive_column_indices_data and receive_values_data
4918  // from this processor:
4919  // column_indices_to_send[my_rank] and values_to_send[my_rank]
4920  //
4921  // They are in some order determined by the distribution.
4922  // We need to re-arrange them. To do this, we do some pre-processing.
4923 
4924  // nrow_local for this processor.
4925  unsigned res_nrow_local = res_distribution_pt->nrow_local();
4926 
4927  // Per row, store:
4928  // 1) where this row came from, 0 - this proc, 1 - other procs.
4929  // 2) the nnz,
4930  // 3) the offset - where the values/columns in the receive data vectors
4931  // begins. This is different from the offset of where
4932  // the data from a certain processor starts.
4933  Vector<Vector<unsigned> > value_column_locations(res_nrow_local,
4934  Vector<unsigned>(3,0));
4935 
4936  // Store the local nnz so we can reserve space for
4937  // the values and column indices.
4938  unsigned long res_nnz_local = 0;
4939 
4940  double t_locations_start;
4941  if(enable_timing)
4942  t_locations_start = TimingHelpers::timer();
4943 
4944  // Loop through the data currently on this processor.
4945  unsigned location_i = 0;
4946  unsigned my_column_indices_to_send_size
4947  = column_indices_to_send[my_rank].size();
4948  while(location_i < my_column_indices_to_send_size)
4949  {
4950  unsigned current_local_eqn
4951  = column_indices_to_send[my_rank][location_i++];
4952 
4953  unsigned current_nnz
4954  = column_indices_to_send[my_rank][location_i++];
4955 
4956  // No need to fill [*][0] with 0 since it is already initialised to 0.
4957 
4958  // Store the nnz.
4959  value_column_locations[current_local_eqn][1] = current_nnz;
4960 
4961  // Also increment the res_local_nnz
4962  res_nnz_local += current_nnz;
4963 
4964  // Store the offset.
4965  value_column_locations[current_local_eqn][2] = location_i;
4966 
4967  // Update the location_i so it starts at the next row.
4968  location_i += current_nnz;
4969  }
4970 
4971  // Loop through the data from different processors.
4972 
4973  // Check to see if data has been received.
4974  bool data_has_been_received = false;
4975  unsigned send_rank = 0;
4976  while(send_rank < nproc)
4977  {
4978  if(receive_n[send_rank] > 0)
4979  {
4980  data_has_been_received = true;
4981  break;
4982  }
4983  send_rank++;
4984  }
4985 
4986  location_i = 0; // start at 0.
4987  if(data_has_been_received)
4988  {
4989  unsigned receive_column_indices_data_size
4990  = receive_column_indices_data.size();
4991  while(location_i < receive_column_indices_data_size)
4992  {
4993  unsigned current_local_eqn
4994  = receive_column_indices_data[location_i++];
4995  unsigned current_nnz = receive_column_indices_data[location_i++];
4996 
4997  // These comes from other processors.
4998  value_column_locations[current_local_eqn][0] = 1;
4999 
5000  // Store the nnz.
5001  value_column_locations[current_local_eqn][1] = current_nnz;
5002 
5003  // Also increment the res_local_nnz
5004  res_nnz_local += current_nnz;
5005 
5006  // Store the offset.
5007  value_column_locations[current_local_eqn][2] = location_i;
5008 
5009  // Update the location_i so it starts at the next row.
5010  location_i += current_nnz;
5011  }
5012  }
5013 
5014  if(enable_timing)
5015  {
5016  double t_locations_finish = TimingHelpers::timer();
5017  double t_locations_time = t_locations_finish - t_locations_start;
5018  oomph_info << "t_locations_time: " << t_locations_time << std::endl;
5019  }
5020 
5021  double t_fillvecs_start;
5022  if(enable_timing)
5023  t_fillvecs_start = TimingHelpers::timer();
5024 
5025  // Now loop through the locations and store the values
5026  // the column indices in the correct order.
5027  Vector<int> res_column_indices;
5028  Vector<double> res_values;
5029  Vector<int> res_row_start;
5030 
5031  res_column_indices.reserve(res_nnz_local);
5032  res_values.reserve(res_nnz_local);
5033  res_row_start.reserve(res_nrow_local+1);
5034 
5035  // Running sum of nnz for the row_start. Must be int because
5036  // res_row_start is templated with int.
5037  int nnz_running_sum = 0;
5038 
5039  // Now insert the rows.
5040  for (unsigned local_row_i = 0;
5041  local_row_i < res_nrow_local; local_row_i++)
5042  {
5043  // Fill the res_row_start with the nnz so far.
5044  res_row_start.push_back(nnz_running_sum);
5045 
5046  bool data_is_from_other_proc
5047  = bool(value_column_locations[local_row_i][0]);
5048 
5049  unsigned row_i_nnz = value_column_locations[local_row_i][1];
5050 
5051  unsigned row_i_offset = value_column_locations[local_row_i][2];
5052 
5053  if(data_is_from_other_proc)
5054  {
5055  // Insert range [offset, offset+nnz) from
5056  // receive_column_indices_data and receive_values_data into
5057  // res_column_indices and res_values respectively.
5058  res_column_indices.insert(res_column_indices.end(),
5059  receive_column_indices_data.begin()
5060  +row_i_offset,
5061  receive_column_indices_data.begin()
5062  +row_i_offset+row_i_nnz);
5063 
5064  res_values.insert(res_values.end(),
5065  receive_values_data.begin()+row_i_offset,
5066  receive_values_data.begin()+row_i_offset+row_i_nnz);
5067  }
5068  else
5069  {
5070  res_column_indices.insert(res_column_indices.end(),
5071  column_indices_to_send[my_rank].begin()
5072  +row_i_offset,
5073  column_indices_to_send[my_rank].begin()
5074  +row_i_offset+row_i_nnz);
5075 
5076  res_values.insert(res_values.end(),
5077  values_to_send[my_rank].begin()+row_i_offset,
5078  values_to_send[my_rank].begin()+row_i_offset+row_i_nnz);
5079  }
5080 
5081  // Update the running sum of nnz
5082  nnz_running_sum += row_i_nnz;
5083  }
5084 
5085  // Insert the last row_start value
5086  res_row_start.push_back(res_nnz_local);
5087 
5088  if(enable_timing)
5089  {
5090  double t_fillvecs_finish = TimingHelpers::timer();
5091  double t_fillvecs_time = t_fillvecs_finish - t_fillvecs_start;
5092  oomph_info << "t_fillvecs_time: " << t_fillvecs_time << std::endl;
5093  }
5094 
5095  double t_buildres_start;
5096  if(enable_timing)
5097  t_buildres_start = TimingHelpers::timer();
5098 
5099  // build the matrix.
5100  result_matrix.build(res_ncol,res_values,res_column_indices,res_row_start);
5101 
5102  if(enable_timing)
5103  {
5104  double t_buildres_finish = TimingHelpers::timer();
5105  double t_buildres_time = t_buildres_finish - t_buildres_start;
5106  oomph_info << "t_buildres_time: " << t_buildres_time << std::endl;
5107  }
5108  // */
5109 #endif
5110  }
5111  }
5112 
5113  //============================================================================
5114  /// \short Concatenate CRDoubleMatrix matrices.
5115  ///
5116  /// The Vector row_distribution_pt contains the LinearAlgebraDistribution
5117  /// of each block row.
5118  /// The Vector col_distribution_pt contains the LinearAlgebraDistribution
5119  /// of each block column.
5120  /// The DenseMatrix matrix_pt contains pointers to the CRDoubleMatrices
5121  /// to concatenate.
5122  /// The CRDoubleMatrix result_matrix is the result matrix.
5123  ///
5124  /// The result matrix is a permutation of the sub matrices such that the data
5125  /// stays on the same processor when the result matrix is built, there is no
5126  /// communication between processors.
5127  /// Thus the block structure of the sub matrices are NOT preserved in the
5128  /// result matrix. The rows are block-permuted, defined by the concatenation
5129  /// of the distributions in row_distribution_pt. Similarly, the columns are
5130  /// block-permuted, defined by the concatenation of the distributions in
5131  /// col_distribution_pt.
5132  /// For more details on the block-permutation, see
5133  /// LinearAlgebraDistributionHelpers::concatenate(...).
5134  ///
5135  /// If one wishes to preserve the block structure of the sub matrices in the
5136  /// result matrix, consider using CRDoubleMatrixHelpers::concatenate(...),
5137  /// which uses communication between processors to ensure that the block
5138  /// structure of the sub matrices are preserved.
5139  ///
5140  /// The matrix manipulation functions
5141  /// CRDoubleMatrixHelpers::concatenate(...) and
5142  /// CRDoubleMatrixHelpers::concatenate_without_communication(...)
5143  /// are analogous to the Vector manipulation functions
5144  /// DoubleVectorHelpers::concatenate(...) and
5145  /// DoubleVectorHelpers::concatenate_without_communication(...).
5146  /// Please look at the DoubleVector functions for an illustration of the
5147  /// differences between concatenate(...) and
5148  /// concatenate_without_communication(...).
5149  ///
5150  /// Distribution of the result matrix:
5151  /// If the result matrix does not have a distribution built, then it will be
5152  /// given a distribution built from the concatenation of the distributions
5153  /// from row_distribution_pt, see
5154  /// LinearAlgebraDistributionHelpers::concatenate(...) for more detail.
5155  /// Otherwise we use the existing distribution.
5156  /// If there is an existing distribution then it must be the same as the
5157  /// distribution from the concatenation of row distributions as described
5158  /// above.
5159  /// Why don't we always compute the distribution "on the fly"?
5160  /// Because a non-uniform distribution requires communication.
5161  /// All block preconditioner distributions are concatenations of the
5162  /// distributions of the individual blocks.
5163  //============================================================================
5165  const Vector<LinearAlgebraDistribution*> &row_distribution_pt,
5166  const Vector<LinearAlgebraDistribution*> &col_distribution_pt,
5167  const DenseMatrix<CRDoubleMatrix*> &matrix_pt,
5168  CRDoubleMatrix &result_matrix)
5169  {
5170  // The number of block rows and block columns.
5171  unsigned matrix_nrow = matrix_pt.nrow();
5172  unsigned matrix_ncol = matrix_pt.ncol();
5173 
5174  // PARANOID checks involving in matrices and block_distribution only.
5175  // PARANOID checks involving the result matrix will come later since
5176  // we have to create the result matrix distribution from the in distribution
5177  // if it does not already exist.
5178 #ifdef PARANOID
5179 
5180  // Are there matrices to concatenate?
5181  if(matrix_nrow == 0 || matrix_ncol == 0)
5182  {
5183  std::ostringstream error_message;
5184  error_message << "There are no matrices to concatenate.\n";
5185  throw OomphLibError(error_message.str(),
5186  OOMPH_CURRENT_FUNCTION,
5187  OOMPH_EXCEPTION_LOCATION);
5188  }
5189 
5190  // Does this matrix need concatenating?
5191  if((matrix_nrow == 1) && (matrix_ncol == 1))
5192  {
5193  std::ostringstream warning_message;
5194  warning_message << "There is only one matrix to concatenate...\n"
5195  << "This does not require concatenating...\n";
5196  OomphLibWarning(warning_message.str(),
5197  OOMPH_CURRENT_FUNCTION,
5198  OOMPH_EXCEPTION_LOCATION);
5199  }
5200 
5201 
5202 
5203  // The distribution for each block row is stored in row_distribution_pt.
5204  // So the number of distributions in row_distribution_pt must be the
5205  // same as matrix_nrow.
5206  if(matrix_nrow != row_distribution_pt.size())
5207  {
5208  std::ostringstream error_message;
5209  error_message << "The number of row distributions must be the same as\n"
5210  << "the number of block rows.";
5211  throw OomphLibError(error_message.str(),
5212  OOMPH_CURRENT_FUNCTION,
5213  OOMPH_EXCEPTION_LOCATION);
5214  }
5215 
5216  // The number of distributions for the columns must match the number of
5217  // block columns.
5218  if(matrix_ncol != col_distribution_pt.size())
5219  {
5220  std::ostringstream error_message;
5221  error_message << "The number of column distributions must be the same as\n"
5222  << "the number of block columns.";
5223  throw OomphLibError(error_message.str(),
5224  OOMPH_CURRENT_FUNCTION,
5225  OOMPH_EXCEPTION_LOCATION);
5226  }
5227 
5228  // Check that all pointers in row_distribution_pt is not null.
5229  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5230  {
5231  if(row_distribution_pt[block_row_i] == 0)
5232  {
5233  std::ostringstream error_message;
5234  error_message << "The row distribution pointer in position "
5235  << block_row_i <<" is null.\n";
5236  throw OomphLibError(error_message.str(),
5237  OOMPH_CURRENT_FUNCTION,
5238  OOMPH_EXCEPTION_LOCATION);
5239  }
5240  }
5241 
5242  // Check that all pointers in row_distribution_pt is not null.
5243  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5244  {
5245  if(col_distribution_pt[block_col_i] == 0)
5246  {
5247  std::ostringstream error_message;
5248  error_message << "The column distribution pointer in position "
5249  << block_col_i <<" is null.\n";
5250  throw OomphLibError(error_message.str(),
5251  OOMPH_CURRENT_FUNCTION,
5252  OOMPH_EXCEPTION_LOCATION);
5253  }
5254  }
5255 
5256  // Check that all distributions are built.
5257  // First the row distributions
5258  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5259  {
5260  if(!row_distribution_pt[block_row_i]->built())
5261  {
5262  std::ostringstream error_message;
5263  error_message << "The distribution pointer in position "
5264  << block_row_i <<" is not built.\n";
5265  throw OomphLibError(error_message.str(),
5266  OOMPH_CURRENT_FUNCTION,
5267  OOMPH_EXCEPTION_LOCATION);
5268  }
5269  }
5270  // Now the column distributions
5271  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5272  {
5273  if(!col_distribution_pt[block_col_i]->built())
5274  {
5275  std::ostringstream error_message;
5276  error_message << "The distribution pointer in position "
5277  << block_col_i <<" is not built.\n";
5278  throw OomphLibError(error_message.str(),
5279  OOMPH_CURRENT_FUNCTION,
5280  OOMPH_EXCEPTION_LOCATION);
5281  }
5282  }
5283 
5284  // Check that all communicators in row_distribution_pt are the same.
5285  const OomphCommunicator first_row_comm
5286  = *(row_distribution_pt[0]->communicator_pt());
5287 
5288  for (unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
5289  {
5290  const OomphCommunicator current_comm
5291  = *(row_distribution_pt[block_row_i]->communicator_pt());
5292 
5293  if(first_row_comm != current_comm)
5294  {
5295  std::ostringstream error_message;
5296  error_message <<"The communicator from the row distribution in position "
5297  << block_row_i << " is not the same as the first "
5298  << "communicator from row_distribution_pt";
5299  throw OomphLibError(error_message.str(),
5300  OOMPH_CURRENT_FUNCTION,
5301  OOMPH_EXCEPTION_LOCATION);
5302  }
5303  }
5304 
5305  // Check that all communicators in col_distribution_pt are the same as the
5306  // first row communicator from above.
5307  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5308  {
5309  const OomphCommunicator current_comm
5310  = *(col_distribution_pt[block_col_i]->communicator_pt());
5311 
5312  if(first_row_comm != current_comm)
5313  {
5314  std::ostringstream error_message;
5315  error_message <<"The communicator from the col distribution in position "
5316  << block_col_i << " is not the same as the first "
5317  << "communicator from row_distribution_pt";
5318  throw OomphLibError(error_message.str(),
5319  OOMPH_CURRENT_FUNCTION,
5320  OOMPH_EXCEPTION_LOCATION);
5321  }
5322  }
5323 
5324  // Are all sub matrices built? If the matrix_pt is not null, make sure
5325  // that it is built.
5326  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5327  {
5328  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5329  {
5330  if (matrix_pt(block_row_i,block_col_i) != 0 &&
5331  !(matrix_pt(block_row_i,block_col_i)->built()))
5332  {
5333  std::ostringstream error_message;
5334  error_message
5335  << "The sub matrix_pt(" << block_row_i << "," << block_col_i << ")\n"
5336  << "is not built.\n";
5337  throw OomphLibError(error_message.str(),
5338  OOMPH_CURRENT_FUNCTION,
5339  OOMPH_EXCEPTION_LOCATION);
5340  }
5341  }
5342  }
5343 
5344  // For the matrices which are built, do they have the same communicator as
5345  // the first communicator from row_distribution_pt?
5346  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5347  {
5348  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5349  {
5350  if(matrix_pt(block_row_i,block_col_i) != 0)
5351  {
5352  const OomphCommunicator current_comm
5353  = *(matrix_pt(block_row_i,block_col_i)
5354  ->distribution_pt()
5355  ->communicator_pt());
5356  if(first_row_comm != current_comm)
5357  {
5358  std::ostringstream error_message;
5359  error_message
5360  << "The sub matrix_pt(" << block_row_i << ","<<block_col_i<< ")\n"
5361  << "does not have the same communicator pointer as those in\n"
5362  << "(row|col)_distribution_pt.\n";
5363  throw OomphLibError(error_message.str(),
5364  OOMPH_CURRENT_FUNCTION,
5365  OOMPH_EXCEPTION_LOCATION);
5366  }
5367  }
5368  }
5369  }
5370 
5371  // Do all dimensions of sub matrices "make sense"?
5372  // Compare the number of rows of each block matrix in a block row.
5373  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5374  {
5375  // Use the first column to compare against the rest.
5376  unsigned long current_block_nrow = row_distribution_pt[block_row_i]
5377  ->nrow();
5378 
5379  // Compare against columns 0 to matrix_ncol - 1
5380  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5381  {
5382  // Perform the check if the matrix_pt is not null.
5383  if(matrix_pt(block_row_i,block_col_i) != 0)
5384  {
5385  // Get the nrow for this sub block.
5386  unsigned long subblock_nrow
5387  = matrix_pt(block_row_i,block_col_i)->nrow();
5388 
5389  if(current_block_nrow != subblock_nrow)
5390  {
5391  std::ostringstream error_message;
5392  error_message
5393  << "The sub matrix (" << block_row_i << "," << block_col_i << ")\n"
5394  << "requires nrow = " << current_block_nrow
5395  << ", but has nrow = " << subblock_nrow <<".\n"
5396  << "Either the row_distribution_pt is incorrect or "
5397  << "the sub matrices are incorrect.\n";
5398  throw OomphLibError(error_message.str(),
5399  OOMPH_CURRENT_FUNCTION,
5400  OOMPH_EXCEPTION_LOCATION);
5401  }
5402  }
5403  }
5404  }
5405 
5406  // Compare the number of columns of each block matrix in a block column.
5407  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5408  {
5409  // Get the current block ncol from the linear algebra distribution.
5410  // Note that we assume that the dimensions are symmetrical.
5411  unsigned current_block_ncol = col_distribution_pt[block_col_i]
5412  ->nrow();
5413 
5414  for(unsigned block_row_i = 0;
5415  block_row_i < matrix_nrow; block_row_i++)
5416  {
5417  if(matrix_pt(block_row_i,block_col_i) != 0)
5418  {
5419  // Get the ncol for this sub block.
5420  unsigned subblock_ncol
5421  = matrix_pt(block_row_i,block_col_i)->ncol();
5422 
5423  if(current_block_ncol != subblock_ncol)
5424  {
5425  std::ostringstream error_message;
5426  error_message
5427  << "The sub matrix (" << block_row_i << "," << block_col_i << ")\n"
5428  << "requires ncol = " << current_block_ncol
5429  << ", but has ncol = " << subblock_ncol << ".\n"
5430  << "Either the col_distribution_pt is incorrect or "
5431  << "the sub matrices are incorrect.\n";
5432  throw OomphLibError(error_message.str(),
5433  OOMPH_CURRENT_FUNCTION,
5434  OOMPH_EXCEPTION_LOCATION);
5435  }
5436  }
5437  }
5438  }
5439 
5440  // Ensure that the distributions for all sub matrices in the same block row
5441  // are the same. This is because we permute the row across several matrices.
5442 
5443  // Loop through each block row.
5444  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5445  {
5446  // Get the distribution from the first block in this row.
5447  LinearAlgebraDistribution* block_row_distribution_pt
5448  = row_distribution_pt[block_row_i];
5449 
5450  // Loop through the block columns
5451  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5452  {
5453  if(matrix_pt(block_row_i,block_col_i) != 0)
5454  {
5455  // Get the distribution for this block.
5456  LinearAlgebraDistribution* current_block_distribution_pt
5457  = matrix_pt(block_row_i,block_col_i)->distribution_pt();
5458 
5459  // Ensure that the in matrices is a square block matrix.
5460  if((*block_row_distribution_pt) != (*current_block_distribution_pt))
5461  {
5462  std::ostringstream error_message;
5463  error_message << "Sub block("<<block_row_i<<","<<block_col_i<<")"
5464  << "does not have the same distributoin as the first"
5465  << "block in this block row.\n"
5466  << "All distributions on a block row must be the same"
5467  << "for this function to concatenate matrices.\n";
5468  throw OomphLibError(error_message.str(),
5469  OOMPH_CURRENT_FUNCTION,
5470  OOMPH_EXCEPTION_LOCATION);
5471  }
5472  }
5473  }
5474  }
5475 #endif
5476 
5477  // The communicator pointer from the first row_distribution_pt
5478  const OomphCommunicator* const comm_pt
5479  = row_distribution_pt[0]->communicator_pt();
5480 
5481  // Renamed for so it makes more sense.
5482  unsigned nblock_row = matrix_nrow;
5483 
5484  // If the result matrix does not have a distribution, then we concatenate
5485  // the sub matrix distributions.
5486  if(!result_matrix.distribution_pt()->built())
5487  {
5488  // The result distribution
5489  LinearAlgebraDistribution tmp_distribution;
5491  tmp_distribution);
5492 
5493  result_matrix.build(&tmp_distribution);
5494  }
5495  else
5496  // A distribution is supplied for the result matrix.
5497  {
5498 #ifdef PARANOID
5499  // Check that the result distribution is a concatenation of the
5500  // distributions of the sub matrices.
5501 
5502  LinearAlgebraDistribution wanted_distribution;
5503 
5505  wanted_distribution);
5506 
5507  if(*(result_matrix.distribution_pt()) != wanted_distribution)
5508  {
5509  std::ostringstream error_message;
5510  error_message << "The result distribution is not correct.\n"
5511  << "Please call the function without a result\n"
5512  << "distribution (clear the result matrix) or check the\n"
5513  << "distribution of the result matrix.\n"
5514  << "The result distribution must be the same as the one \n"
5515  << "created by\n"
5516  << "LinearAlgebraDistributionHelpers::concatenate(...)";
5517  throw OomphLibError(error_message.str(),
5518  OOMPH_CURRENT_FUNCTION,
5519  OOMPH_EXCEPTION_LOCATION);
5520  }
5521 #endif
5522  }
5523 
5524  // The rest of the paranoid checks.
5525 #ifdef PARANOID
5526 
5527  // Make sure that the communicator from the result matrix is the same as
5528  // all the others. This test is redundant if this function created the
5529  // result matrix distribution, since then it is guaranteed that the
5530  // communicators are the same.
5531  {
5532  // Communicator from the result matrix.
5533  const OomphCommunicator res_comm
5534  = *(result_matrix.distribution_pt()->communicator_pt());
5535 
5536  // Is the result communicator pointer the same as the others?
5537  // Since we have already tested the others, we only need to compare against
5538  // one of them. Say the first communicator from row_distribution_pt.
5539  const OomphCommunicator first_comm
5540  = *(row_distribution_pt[0]->communicator_pt());
5541 
5542  if(res_comm != first_comm)
5543  {
5544  std::ostringstream error_message;
5545  error_message
5546  << "The OomphCommunicator of the result matrix is not the same as the "
5547  << "others!";
5548  throw OomphLibError(error_message.str(),
5549  OOMPH_CURRENT_FUNCTION,
5550  OOMPH_EXCEPTION_LOCATION);
5551  }
5552  }
5553 
5554  // Are all the distributed boolean the same? This only applies if we have
5555  // more than one processor. If there is only one processor, then it does
5556  // not matter if it is distributed or not - they are conceptually the same.
5557  if(comm_pt->nproc() != 1)
5558  {
5559  // Compare distributed for sub matrices (against the result matrix).
5560  const bool res_distributed = result_matrix.distributed();
5561 
5562  // Loop over all sub blocks.
5563  for(unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5564  {
5565  for(unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5566  {
5567  if(matrix_pt(block_row_i,block_col_i) != 0)
5568  {
5569  const bool another_distributed
5570  = matrix_pt(block_row_i,block_col_i)->distributed();
5571 
5572  if(res_distributed != another_distributed)
5573  {
5574  std::ostringstream error_message;
5575  error_message
5576  << "The distributed boolean of the sub matrix ("
5577  << block_row_i << "," << block_col_i << ")\n"
5578  << "is not the same as the result matrix. \n";
5579  throw OomphLibError(error_message.str(),
5580  OOMPH_CURRENT_FUNCTION,
5581  OOMPH_EXCEPTION_LOCATION);
5582  }
5583  }
5584  }
5585  }
5586 
5587  // Do this test for row_distribution_pt
5588  const bool first_row_distribution_distributed
5589  = row_distribution_pt[0]->distributed();
5590 
5591  for (unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
5592  {
5593  const bool another_distributed
5594  = row_distribution_pt[block_row_i]->distributed();
5595 
5596  if(first_row_distribution_distributed != another_distributed)
5597  {
5598  std::ostringstream error_message;
5599  error_message
5600  << "The distributed boolean of row_distribution_pt["
5601  << block_row_i << "]\n"
5602  << "is not the same as the one from row_distribution_pt[0]. \n";
5603  throw OomphLibError(error_message.str(),
5604  OOMPH_CURRENT_FUNCTION,
5605  OOMPH_EXCEPTION_LOCATION);
5606  }
5607  }
5608 
5609  // Repeat for col_distribution_pt
5610  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5611  {
5612  const bool another_distributed
5613  = col_distribution_pt[block_col_i]->distributed();
5614 
5615  if(first_row_distribution_distributed != another_distributed)
5616  {
5617  std::ostringstream error_message;
5618  error_message
5619  << "The distributed boolean of col_distribution_pt["
5620  << block_col_i << "]\n"
5621  << "is not the same as the one from row_distribution_pt[0]. \n";
5622  throw OomphLibError(error_message.str(),
5623  OOMPH_CURRENT_FUNCTION,
5624  OOMPH_EXCEPTION_LOCATION);
5625  }
5626  }
5627  }
5628 #endif
5629 
5630  ////////////////// END OF PARANOID TESTS ////////////////////////////////////
5631 
5632  // The number of processors.
5633  unsigned nproc = comm_pt->nproc();
5634 
5635  // Cache the result distribution pointer for convenience.
5636  LinearAlgebraDistribution* res_distribution_pt
5637  = result_matrix.distribution_pt();
5638 
5639  // nrow_local for the result matrix
5640  unsigned res_nrow_local = res_distribution_pt->nrow_local();
5641 
5642  // renamed for readability.
5643  unsigned nblock_col = matrix_ncol;
5644 
5645  // construct the block offset
5646 // DenseMatrix<unsigned> col_offset(nproc,nblock_col,0);
5647  std::vector<std::vector<unsigned> > col_offset(
5648  nproc,
5649  std::vector<unsigned>(nblock_col));
5650  unsigned off = 0;
5651  for (unsigned proc_i = 0; proc_i < nproc; proc_i++)
5652  {
5653  for (unsigned block_i = 0; block_i < nblock_col; block_i++)
5654  {
5655  col_offset[proc_i][block_i] = off;
5656  off +=col_distribution_pt[block_i]->nrow_local(proc_i);
5657  }
5658  }
5659 
5660  // Do some pre-processing for the processor number a global row number is
5661  // on. This is required when permuting the column entries.
5662  // We need to do this for each distribution, so we have a vector of
5663  // vectors. First index corresponds to the distribution, the second is
5664  // the processor number.
5665  std::vector<std::vector<unsigned> > p_for_rows(
5666  nblock_col,std::vector<unsigned>());
5667  // initialise 2D vector
5668  for (unsigned blocki = 0; blocki < nblock_col; blocki++)
5669  {
5670  int blockinrow = col_distribution_pt[blocki]->nrow();
5671  p_for_rows[blocki].resize(blockinrow);
5672  // FOR each global index in the block, work out the corresponding proc.
5673  for (int rowi = 0; rowi < blockinrow; rowi++)
5674  {
5675  unsigned p = 0;
5676  int b_first_row = col_distribution_pt[blocki]->first_row(p);
5677  int b_nrow_local = col_distribution_pt[blocki]->nrow_local(p);
5678 
5679  while (rowi < b_first_row ||
5680  rowi >= b_nrow_local + b_first_row)
5681  {
5682  p++;
5683  b_first_row = col_distribution_pt[blocki]->first_row(p);
5684  b_nrow_local = col_distribution_pt[blocki]->nrow_local(p);
5685  }
5686  p_for_rows[blocki][rowi] = p;
5687  }
5688  }
5689 
5690  // determine nnz of all blocks on this processor only.
5691  // This is used to create storage space.
5692  unsigned long res_nnz = 0;
5693  for (unsigned row_i = 0; row_i < nblock_row; row_i++)
5694  {
5695  for (unsigned col_i = 0; col_i < nblock_col; col_i++)
5696  {
5697  if(matrix_pt(row_i,col_i) !=0)
5698  {
5699  res_nnz += matrix_pt(row_i,col_i)->nnz();
5700  }
5701  }
5702  }
5703 
5704  // My rank
5705 // unsigned my_rank = comm_pt->my_rank();
5706 // my_rank = my_rank;
5707 
5708  // Turn the above into a string.
5709 // std::ostringstream myrankstream;
5710 // myrankstream << "THISDOESNOTHINGnp" << my_rank << std::endl;
5711 // std::string myrankstring = myrankstream.str();
5712 
5713 
5714 //CALLGRIND_ZERO_STATS;
5715 //CALLGRIND_START_INSTRUMENTATION;
5716 
5717  // storage for the result matrix.
5718  int* res_row_start = new int[res_nrow_local+1];
5719  int* res_column_index = new int[res_nnz];
5720  double* res_value = new double[res_nnz];
5721 
5722  // initialise the zero-th entry
5723  res_row_start[0] = 0;
5724 
5725  // loop over the block rows
5726  unsigned long res_i = 0; // index for the result matrix.
5727  unsigned long res_row_i = 0; // index for the row
5728  for (unsigned i = 0; i < nblock_row; i++)
5729  {
5730  // loop over the rows of the current block local rows.
5731  unsigned block_nrow = row_distribution_pt[i]->nrow_local();
5732  for (unsigned k = 0; k < block_nrow; k++)
5733  {
5734  // initialise res_row_start
5735  res_row_start[res_row_i+1] = res_row_start[res_row_i];
5736 
5737  // Loop over the block columns
5738  for (unsigned j = 0; j < nblock_col; j++)
5739  {
5740  // if block(i,j) pointer is not null then
5741  if (matrix_pt(i,j) != 0)
5742  {
5743  // get pointers for the elements in the current block
5744  int* b_row_start = matrix_pt(i,j)->row_start();
5745  int* b_column_index = matrix_pt(i,j)->column_index();
5746  double* b_value = matrix_pt(i,j)->value();
5747 
5748  //memcpy( &dst[dstIdx], &src[srcIdx], numElementsToCopy * sizeof( Element ) );
5749  // no ele to copy
5750  int numEleToCopy = b_row_start[k+1] - b_row_start[k];
5751  memcpy(res_value+res_i,b_value+b_row_start[k],numEleToCopy*sizeof(double));
5752  // Loop through the current local row.
5753  for (int l = b_row_start[k]; l < b_row_start[k+1]; l++)
5754  {
5755  // if b_column_index[l] was a row index, what processor
5756  // would it be on
5757 // unsigned p = col_distribution_pt[j]
5758 // ->rank_of_global_row_map(b_column_index[l]);
5759  unsigned p = p_for_rows[j][b_column_index[l]];
5760 
5761  int b_first_row = col_distribution_pt[j]->first_row(p);
5762 // int b_nrow_local = col_distribution_pt[j]->nrow_local(p);
5763 
5764 // while (b_column_index[l] < b_first_row ||
5765 // b_column_index[l] >= b_nrow_local+b_first_row)
5766 // {
5767 // p++;
5768 // b_first_row = col_distribution_pt[j]->first_row(p);
5769 // b_nrow_local = col_distribution_pt[j]->nrow_local(p);
5770 // }
5771 
5772  // determine the local equation number in the block j/processor p
5773  // "column block"
5774  int eqn = b_column_index[l]-b_first_row;
5775 
5776  // add to the result matrix
5777 // res_value[res_i] = b_value[l];
5778  res_column_index[res_i] = col_offset[p][j]+eqn;
5779  res_row_start[res_row_i+1]++;
5780  res_i++;
5781  }
5782  }
5783  }
5784 
5785  // increment the row pt
5786  res_row_i++;
5787 
5788  }
5789  }
5790 //CALLGRIND_STOP_INSTRUMENTATION;
5791 //CALLGRIND_DUMP_STATS_AT(myrankstring.c_str());
5792 
5793 
5794  // Get the number of columns of the result matrix.
5795  unsigned res_ncol = 0;
5796  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5797  {
5798  res_ncol += col_distribution_pt[block_col_i]->nrow();
5799  }
5800 
5801  // Build the result matrix.
5802  result_matrix.build_without_copy(res_ncol,res_nnz,
5803  res_value,
5804  res_column_index,
5805  res_row_start);
5806  }
5807 
5808 
5809 
5810  //============================================================================
5811  /// \short Concatenate CRDoubleMatrix matrices.
5812  /// This calls the other concatenate_without_communication(...) function,
5813  /// passing block_distribution_pt as both the row_distribution_pt and
5814  /// col_distribution_pt. This should only be called for block square matrices.
5815  //============================================================================
5817  const Vector<LinearAlgebraDistribution*> &block_distribution_pt,
5818  const DenseMatrix<CRDoubleMatrix*> &matrix_pt,
5819  CRDoubleMatrix &result_matrix)
5820  {
5821 
5822 #ifdef PARANOID
5823  // The number of block rows and block columns.
5824  unsigned matrix_nrow = matrix_pt.nrow();
5825  unsigned matrix_ncol = matrix_pt.ncol();
5826 
5827  // Are there matrices to concatenate?
5828  if(matrix_nrow == 0)
5829  {
5830  std::ostringstream error_message;
5831  error_message << "There are no matrices to concatenate.\n";
5832  throw OomphLibError(error_message.str(),
5833  OOMPH_CURRENT_FUNCTION,
5834  OOMPH_EXCEPTION_LOCATION);
5835  }
5836 
5837  // Ensure that the sub matrices is a square block matrix.
5838  if(matrix_nrow != matrix_ncol)
5839  {
5840  std::ostringstream error_message;
5841  error_message<<"The number of block rows and block columns\n"
5842  <<"must be the same. Otherwise, call the other\n"
5843  <<"concatenate_without_communication function, passing in\n"
5844  <<"a Vector of distributions describing how to permute the\n"
5845  <<"columns.";
5846  throw OomphLibError(error_message.str(),
5847  OOMPH_CURRENT_FUNCTION,
5848  OOMPH_EXCEPTION_LOCATION);
5849  }
5850 #endif
5851 
5853  block_distribution_pt,block_distribution_pt,matrix_pt,result_matrix);
5854  }
5855 
5856 } // CRDoubleMatrixHelpers
5857 
5858 } // namespace oomph
unsigned long N
Number of rows.
Definition: matrices.h:414
int * column_index()
Access to C-style column index array.
Definition: matrices.h:1056
unsigned long nnz() const
Return the number of nonzero entries (the local nnz)
Definition: matrices.h:1068
virtual void solve(Problem *const &problem_pt, DoubleVector &result)=0
Solver: Takes pointer to problem and returns the results vector which contains the solution of the li...
Class of matrices containing doubles, and stored as a DenseMatrix<double>, but with solving functiona...
Definition: matrices.h:1234
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:2613
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:504
void concatenate_without_communication(const Vector< LinearAlgebraDistribution *> &block_distribution_pt, const DenseMatrix< CRDoubleMatrix *> &matrix_pt, CRDoubleMatrix &result_matrix)
Concatenate CRDoubleMatrix matrices. This calls the other concatenate_without_communication(...) function, passing block_distribution_pt as both the row_distribution_pt and col_distribution_pt. This should only be called for block square matrices.
Definition: matrices.cc:5816
double inf_norm() const
returns the inf-norm of this matrix
Definition: matrices.cc:3392
int * Column_start
Start index for column.
Definition: matrices.h:2561
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:1805
double inf_norm(const DenseMatrix< CRDoubleMatrix *> &matrix_pt)
Compute infinity (maximum) norm of sub blocks as if it was one matrix.
Definition: matrices.cc:3708
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:507
LinearSolver * Default_linear_solver_pt
Definition: matrices.h:283
void redistribute(const LinearAlgebraDistribution *const &dist_pt)
Definition: matrices.cc:2580
virtual ~DenseDoubleMatrix()
Destructor.
Definition: matrices.cc:189
bool built() const
access function to the Built flag - indicates whether the matrix has been build - i...
Definition: matrices.h:1177
void add(const CRDoubleMatrix &matrix_in, CRDoubleMatrix &result_matrix) const
element-wise addition of this matrix with matrix_in.
Definition: matrices.cc:3495
double * values_pt()
access function to the underlying values
cstr elem_len * i
Definition: cfortran.h:607
void build_without_copy(T *value, int *row_index, int *column_start, const unsigned long &nnz, const unsigned long &n, const unsigned long &m)
Function to build matrix from pointers to arrays which hold the column starts, row indices and non-ze...
Definition: matrices.h:2981
void concatenate(const Vector< LinearAlgebraDistribution *> &in_distribution_pt, LinearAlgebraDistribution &out_distribution)
Takes a vector of LinearAlgebraDistribution objects and concatenates them such that the nrow_local of...
int * row_index()
Access to C-style row index array.
Definition: matrices.h:2492
virtual void ludecompose()
LU decomposition using SuperLU if matrix is not distributed or distributed onto a single processor...
Definition: matrices.cc:1749
OomphCommunicator * communicator_pt() const
const access to the communicator pointer
virtual void lubksub(DoubleVector &rhs)
LU backsubstitution.
Definition: matrices.cc:209
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:303
void build(const OomphCommunicator *const comm_pt, const unsigned &first_row, const unsigned &nrow_local, const unsigned &nrow=0)
Sets the distribution. Takes first_row, nrow_local and nrow as arguments. If nrow is not provided or ...
void multiply(const CRDoubleMatrix *matrix, const DoubleVector &x, DoubleVector &soln)
Function to perform a matrix-vector multiplication on a oomph-lib matrix and vector using Trilinos fu...
virtual void ludecompose()
LU decomposition using SuperLU.
Definition: matrices.cc:617
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:729
virtual ~CRDoubleMatrix()
Destructor.
Definition: matrices.cc:1367
unsigned nrow() const
access function to the number of global rows.
unsigned first_row() const
access function for the first row on this processor. If not distributed then this is just zero...
OomphInfo oomph_info
bool distributed() const
access function to the distributed - indicates whether the distribution is serial or distributed ...
. SuperLU Project Solver class. This is a combined wrapper for both SuperLU and SuperLU Dist...
void build(const Vector< double > &value, const Vector< int > &row_index, const Vector< int > &column_start, const unsigned long &n, const unsigned long &m)
Build matrix from compressed representation. Number of nonzero entries is read off from value...
Definition: matrices.h:3021
DenseDoubleMatrix()
Constructor, set the default linear solver.
Definition: matrices.cc:146
double * value()
Access to C-style value array.
Definition: matrices.h:1062
LinearSolver * Linear_solver_pt
Definition: matrices.h:280
unsigned long nnz() const
Return the number of nonzero entries.
Definition: matrices.h:630
int * Row_index
Row index.
Definition: matrices.h:2558
bool built() const
void solve(DoubleVector &rhs)
Complete LU solve (replaces matrix by its LU decomposition and overwrites RHS with solution)...
Definition: matrices.cc:55
virtual void lubksub(DoubleVector &rhs)
LU back solve for given RHS.
Definition: matrices.cc:1772
bool entries_are_sorted(const bool &doc_unordered_entries=false) const
Runs through the column index vector and checks if the entries follow the regular lexicographical ord...
Definition: matrices.cc:1390
bool distributed() const
distribution is serial or distributed
void matrix_reduction(const double &alpha, CCDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:1171
virtual ~CCDoubleMatrix()
Destructor: Kill the LU factors if they have been setup.
Definition: matrices.cc:611
Describes the distribution of a distributable linear algebra type object. Typically this is a contain...
void clear()
clear
Definition: matrices.cc:1677
Dense LU decomposition-based solve of full assembled linear system. VERY inefficient but useful to il...
void build(const DoubleVector &old_vector)
Just copys the argument DoubleVector.
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:998
CRDoubleMatrix * global_matrix() const
if this matrix is distributed then a the equivalent global matrix is built using new and returned...
Definition: matrices.cc:2458
A class for compressed column matrices: a sparse matrix format The class is passed as the MATRIX_TYPE...
Definition: matrices.h:2377
void initialise(const double &v)
initialise the whole vector with value v
unsigned long M
Number of columns.
Definition: matrices.h:576
virtual void lubksub(DoubleVector &rhs)
LU back solve for given RHS.
Definition: matrices.cc:625
unsigned nrow_local() const
access function for the num of local rows on this processor. If no MPI then Nrow is returned...
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:1907
CRDoubleMatrix()
Default constructor.
Definition: matrices.cc:1238
Vector< double > diagonal_entries() const
returns a Vector of diagonal entries of this matrix. This only works with square matrices. This condition may be relaxed in the future if need be.
Definition: matrices.cc:3442
double timer()
returns the time in seconds after some point in past
void sort_entries()
Sorts the entries associated with each row of the matrix in the column index vector and the value vec...
Definition: matrices.cc:1473
unsigned rank_of_global_row(const unsigned i) const
return the processor rank of the global row number i
A class for compressed column matrices that store doubles.
Definition: matrices.h:2573
double gershgorin_eigenvalue_estimate(const DenseMatrix< CRDoubleMatrix *> &matrix_pt)
Calculates the largest Gershgorin disc whilst preserving the sign. Let A be an n by n matrix...
Definition: matrices.cc:3975
virtual void clean_up_memory()
Empty virtual function that can be overloaded in specific linear solvers to clean up any memory that ...
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:1269
unsigned nrow() const
access function to the number of global rows.
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:393
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:1272
unsigned long M
Number of columns.
Definition: matrices.h:417
virtual void ludecompose()
LU decomposition using DenseLU (default linea solver)
Definition: matrices.cc:199
void eigenvalues_by_jacobi(Vector< double > &eigen_val, DenseMatrix< double > &eigen_vect) const
Determine eigenvalues and eigenvectors, using Jacobi rotations. Only for symmetric matrices...
Definition: matrices.cc:231
double * Value
Internal representation of the matrix values, a pointer.
Definition: matrices.h:570
unsigned nrow_local() const
access function for the num of local rows on this processor.
Class for dense matrices, storing all the values of the matrix as a pointer to a pointer with assorte...
Definition: communicator.h:50
int * column_start()
Access to C-style column_start array.
Definition: matrices.h:2486
A vector in the mathematical sense, initially developed for linear algebra type applications. If MPI then this vector can be distributed - its distribution is described by the LinearAlgebraDistribution object at Distribution_pt. Data is stored in a C-style pointer vector (double*)
Definition: double_vector.h:61
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:992
void matrix_reduction(const double &alpha, CRDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:2392
double * Matrixdata
Internal representation of matrix as a pointer to data.
Definition: matrices.h:411
CCDoubleMatrix()
Default constructor.
Definition: matrices.cc:586
int * row_start()
Access to C-style row_start array.
Definition: matrices.h:1050
void matrix_reduction(const double &alpha, DenseDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:491
unsigned Matrix_matrix_multiply_method
Flag to determine which matrix-matrix multiplication method is used.
Definition: matrices.h:2679
void concatenate(const DenseMatrix< CRDoubleMatrix *> &matrix_pt, CRDoubleMatrix &result_matrix)
Concatenate CRDoubleMatrix matrices. The in matrices are concatenated such that the block structure o...
Definition: matrices.cc:4309
A class for compressed row matrices. This is a distributable object.
Definition: matrices.h:872
void create_uniformly_distributed_matrix(const unsigned &nrow, const unsigned &ncol, const OomphCommunicator *const comm_pt, const Vector< double > &values, const Vector< int > &column_indices, const Vector< int > &row_start, CRDoubleMatrix &matrix_out)
Builds a uniformly distributed matrix. A locally replicated matrix is constructed then redistributed ...
Definition: matrices.cc:3658
LinearAlgebraDistribution * distribution_pt() const
access to the LinearAlgebraDistribution
void get_matrix_transpose(CRDoubleMatrix *result) const
Returns the transpose of this matrix.
Definition: matrices.cc:3250
void build(const LinearAlgebraDistribution *distribution_pt, const unsigned &ncol, const Vector< double > &value, const Vector< int > &column_index, const Vector< int > &row_start)
build method: vector of values, vector of column indices, vector of row starts and number of rows and...
Definition: matrices.cc:1692
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:633
double * value()
Access to C-style value array.
Definition: matrices.h:618
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:2610
unsigned long Nnz
Number of non-zero values (i.e. size of Value array)
Definition: matrices.h:579
void build_without_copy(const unsigned &ncol, const unsigned &nnz, double *value, int *column_index, int *row_start)
keeps the existing distribution and just matrix that is stored without copying the matrix data ...
Definition: matrices.cc:1731
An oomph-lib wrapper to the MPI_Comm communicator object. Just contains an MPI_Comm object (which is ...
Definition: communicator.h:57
void resize(const unsigned long &n)
Definition: matrices.h:511