SNAP Library 3.0, User Reference  2016-07-20 17:56:49
SNAP, a general purpose, high performance system for analysis and manipulation of large networks
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
conv.h
Go to the documentation of this file.
1 #ifndef CONV_H
2 #define CONV_H
3 
4 namespace TSnap {
5 
7 template<class PGraph>
8 PGraph ToGraph(PTable Table, const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
9 {
10  PGraph Graph = PGraph::TObj::New();
11 
12  const TAttrType NodeType = Table->GetColType(SrcCol);
13  Assert(NodeType == Table->GetColType(DstCol));
14  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
15  const TInt DstColIdx = Table->GetColIdx(DstCol);
16 
17  // make single pass over all rows in the table
18  if (NodeType == atInt) {
19  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
20  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
21  // add src and dst nodes to graph if they are not seen earlier
22  TInt SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
23  TInt DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
24  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
25  Graph->AddNodeUnchecked(SVal);
26  Graph->AddNodeUnchecked(DVal);
27  Graph->AddEdgeUnchecked(SVal, DVal);
28  }
29  } else if (NodeType == atFlt) {
30  // node values - i.e. the unique values of src/dst col
31  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
32  THash<TFlt, TInt> FltNodeVals;
33  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
34  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
35  // add src and dst nodes to graph if they are not seen earlier
36  TInt SVal, DVal;
37  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
38  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
39  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
40  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
41  Graph->AddEdge(SVal, DVal);
42  }
43  } else {
44  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
45  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
46  // add src and dst nodes to graph if they are not seen earlier
47  TInt SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
48 // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
49  TInt DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
50 // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
51  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
52  Graph->AddNodeUnchecked(SVal);
53  Graph->AddNodeUnchecked(DVal);
54  Graph->AddEdgeUnchecked(SVal, DVal);
55  }
56  }
57 
58  Graph->SortNodeAdjV();
59  return Graph;
60 }
61 
64 template<class PGraph>
65 PGraph ToNetwork(PTable Table,
66  const TStr& SrcCol, const TStr& DstCol,
67  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
68  TAttrAggr AggrPolicy)
69 {
70  PGraph Graph = PGraph::TObj::New();
71 
72  const TAttrType NodeType = Table->GetColType(SrcCol);
73  Assert(NodeType == Table->GetColType(DstCol));
74  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
75  const TInt DstColIdx = Table->GetColIdx(DstCol);
76 
77  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
78  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
79  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
80 
81  // node values - i.e. the unique values of src/dst col
82  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
83  THash<TFlt, TInt> FltNodeVals;
84 
85  // node attributes
86  THash<TInt, TStrIntVH> NodeIntAttrs;
87  THash<TInt, TStrFltVH> NodeFltAttrs;
88  THash<TInt, TStrStrVH> NodeStrAttrs;
89 
90  // make single pass over all rows in the table
91  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
92  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
93  continue;
94  }
95 
96  // add src and dst nodes to graph if they are not seen earlier
97  TInt SVal, DVal;
98  if (NodeType == atFlt) {
99  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
100  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
101  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
102  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
103  } else if (NodeType == atInt || NodeType == atStr) {
104  if (NodeType == atInt) {
105  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
106  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
107  } else {
108  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
109  if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
110  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
111  if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
112  }
113  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
114  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
115  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
116  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
117  }
118 
119  // add edge and edge attributes
120  Graph->AddEdge(SVal, DVal, CurrRowIdx);
121 
122  // Aggregate edge attributes and add to graph
123  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
124  TStr ColName = EdgeAttrV[i];
125  TAttrType T = Table->GetColType(ColName);
126  TInt Index = Table->GetColIdx(ColName);
127  switch (T) {
128  case atInt:
129  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
130  break;
131  case atFlt:
132  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
133  break;
134  case atStr:
135  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
136  break;
137  }
138  }
139 
140  // get src and dst node attributes into hashmaps
141  if ((Table->SrcNodeAttrV).Len() > 0) {
142  Table->AddNodeAttributes(SVal, Table->SrcNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
143  }
144 
145  if ((Table->DstNodeAttrV).Len() > 0) {
146  Table->AddNodeAttributes(DVal, Table->DstNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
147  }
148  }
149 
150  // aggregate node attributes and add to graph
151  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
152  for (TNEANet::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
153  TInt NId = NodeI.GetId();
154  if (NodeIntAttrs.IsKey(NId)) {
155  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
156  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
157  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
158  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
159  }
160  }
161  if (NodeFltAttrs.IsKey(NId)) {
162  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
163  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
164  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
165  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
166  }
167  }
168  if (NodeStrAttrs.IsKey(NId)) {
169  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
170  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
171  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
172  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
173  }
174  }
175  }
176  }
177 
178  return Graph;
179 }
180 
182 template<class PGraph>
183 PGraph ToNetwork(PTable Table,
184  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
185 {
186  TStrV V;
187  return ToNetwork<PGraph>(Table, SrcCol, DstCol, V, AggrPolicy);
188 }
189 
190 #ifdef GCC_ATOMIC
191 template<class PGraphMP>
193 PGraphMP ToGraphMP(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
194  // double start = omp_get_wtime();
195  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
196  const TInt DstColIdx = Table->GetColIdx(DstCol);
197  const TAttrType NodeType = Table->GetColType(SrcCol);
198  Assert(NodeType == Table->GetColType(DstCol));
199 
200  const TInt NumRows = Table->NumValidRows;
201 
202  TIntV SrcCol1, DstCol1, SrcCol2, DstCol2;
203 
204  #pragma omp parallel sections num_threads(4)
205  {
206  #pragma omp section
207  { SrcCol1.Reserve(NumRows, NumRows); }
208  #pragma omp section
209  { SrcCol2.Reserve(NumRows, NumRows); }
210  #pragma omp section
211  { DstCol1.Reserve(NumRows, NumRows); }
212  #pragma omp section
213  { DstCol2.Reserve(NumRows, NumRows); }
214  }
215 
216  // double endResize = omp_get_wtime();
217  // printf("Resize time = %f\n", endResize-start);
218 
219  TIntPrV Partitions;
220  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
221  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
222 
223  // double endPartition = omp_get_wtime();
224  // printf("Partition time = %f\n", endPartition-endResize);
225 
226  omp_set_num_threads(omp_get_max_threads());
227  if (NodeType == atInt) {
228  #pragma omp parallel for schedule(static)
229  for (int i = 0; i < Partitions.Len(); i++) {
230  TRowIterator RowI(Partitions[i].GetVal1(), Table());
231  TRowIterator EndI(Partitions[i].GetVal2(), Table());
232  while (RowI < EndI) {
233  TInt RowId = RowI.GetRowIdx();
234  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
235  SrcCol2[RowId] = RowI.GetIntAttr(SrcColIdx);
236  DstCol1[RowId] = RowI.GetIntAttr(DstColIdx);
237  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
238  RowI++;
239  }
240  }
241  }
242  else if (NodeType == atStr) {
243  #pragma omp parallel for schedule(static)
244  for (int i = 0; i < Partitions.Len(); i++) {
245  TRowIterator RowI(Partitions[i].GetVal1(), Table());
246  TRowIterator EndI(Partitions[i].GetVal2(), Table());
247  while (RowI < EndI) {
248  TInt RowId = RowI.GetRowIdx();
249  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
250  SrcCol2[RowId] = RowI.GetStrMapById(SrcColIdx);
251  DstCol1[RowId] = RowI.GetStrMapById(DstColIdx);
252  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
253  RowI++;
254  }
255  }
256  }
257 
258  omp_set_num_threads(omp_get_max_threads());
259  #pragma omp parallel
260  {
261  #pragma omp single nowait
262  {
263  #pragma omp task untied shared(SrcCol1, DstCol1)
264  { TTable::QSortKeyVal(SrcCol1, DstCol1, 0, NumRows-1); }
265  }
266  #pragma omp single nowait
267  {
268  #pragma omp task untied shared(SrcCol2, DstCol2)
269  { TTable::QSortKeyVal(DstCol2, SrcCol2, 0, NumRows-1); }
270  }
271  #pragma omp taskwait
272  }
273 
274  // TTable::PSRSKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
275  // TTable::PSRSKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
276 
277  // TInt IsS = TTable::CheckSortedKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
278  // TInt IsD = TTable::CheckSortedKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
279  // printf("IsSorted = %d %d\n", IsS.Val, IsD.Val);
280 
281  // double endSort = omp_get_wtime();
282  // printf("Sort time = %f\n", endSort-endCopy);
283  //return TNGraphMP::New(10, 100);
284 
285  TInt NumThreads = omp_get_max_threads();
286  TInt PartSize = (NumRows/NumThreads);
287 
288  TIntV SrcOffsets, DstOffsets;
289  SrcOffsets.Add(0);
290  for (TInt i = 1; i < NumThreads; i++) {
291  TInt CurrOffset = i * PartSize;
292  while (CurrOffset < (i+1) * PartSize &&
293  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
294  CurrOffset++;
295  }
296  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
297  }
298  SrcOffsets.Add(NumRows);
299 
300  DstOffsets.Add(0);
301  for (TInt i = 1; i < NumThreads; i++) {
302  TInt CurrOffset = i * PartSize;
303  while (CurrOffset < (i+1) * PartSize &&
304  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
305  CurrOffset++;
306  }
307  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
308  }
309  DstOffsets.Add(NumRows);
310 
311  TInt SrcPartCnt = SrcOffsets.Len()-1;
312  TInt DstPartCnt = DstOffsets.Len()-1;
313 
314  // for (TInt i = 0; i < SrcOffsets.Len(); i++) {
315  // printf("%d ", SrcOffsets[i].Val);
316  // }
317  // printf("\n");
318  // for (TInt i = 0; i < DstOffsets.Len(); i++) {
319  // printf("%d ", DstOffsets[i].Val);
320  // }
321  // printf("\n");
322 
323  TIntV SrcNodeCounts, DstNodeCounts;
324  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
325  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
326 
327  #pragma omp parallel for schedule(dynamic)
328  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
329  if (t < SrcPartCnt) {
330  TInt i = t;
331  if (SrcOffsets[i] != SrcOffsets[i+1]) {
332  SrcNodeCounts[i] = 1;
333  TInt CurrNode = SrcCol1[SrcOffsets[i]];
334  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
335  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
336  if (j < SrcOffsets[i+1]) {
337  SrcNodeCounts[i]++;
338  CurrNode = SrcCol1[j];
339  }
340  }
341  }
342  } else {
343  TInt i = t - SrcPartCnt;
344  if (DstOffsets[i] != DstOffsets[i+1]) {
345  DstNodeCounts[i] = 1;
346  TInt CurrNode = DstCol2[DstOffsets[i]];
347  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
348  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
349  if (j < DstOffsets[i+1]) {
350  DstNodeCounts[i]++;
351  CurrNode = DstCol2[j];
352  }
353  }
354  }
355  }
356  }
357 
358  // for (TInt i = 0; i < SrcNodeCounts.Len(); i++) {
359  // printf("%d ", SrcNodeCounts[i].Val);
360  // }
361  // printf("\n");
362  // for (TInt i = 0; i < DstNodeCounts.Len(); i++) {
363  // printf("%d ", DstNodeCounts[i].Val);
364  // }
365  // printf("\n");
366 
367  TInt TotalSrcNodes = 0;
368  TIntV SrcIdOffsets;
369  for (int i = 0; i < SrcPartCnt; i++) {
370  SrcIdOffsets.Add(TotalSrcNodes);
371  TotalSrcNodes += SrcNodeCounts[i];
372  }
373 
374  TInt TotalDstNodes = 0;
375  TIntV DstIdOffsets;
376  for (int i = 0; i < DstPartCnt; i++) {
377  DstIdOffsets.Add(TotalDstNodes);
378  TotalDstNodes += DstNodeCounts[i];
379  }
380 
381  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
382 
383  TIntPrV SrcNodeIds, DstNodeIds;
384  #pragma omp parallel sections
385  {
386  #pragma omp section
387  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
388  #pragma omp section
389  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
390  }
391 
392  #pragma omp parallel for schedule(dynamic)
393  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
394  if (t < SrcPartCnt) {
395  TInt i = t;
396  if (SrcOffsets[i] != SrcOffsets[i+1]) {
397  TInt CurrNode = SrcCol1[SrcOffsets[i]];
398  TInt ThreadOffset = SrcIdOffsets[i];
399  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
400  TInt CurrCount = 1;
401  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
402  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
403  if (j < SrcOffsets[i+1]) {
404  CurrNode = SrcCol1[j];
405  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
406  CurrCount++;
407  }
408  }
409  }
410  } else {
411  TInt i = t - SrcPartCnt;
412  if (DstOffsets[i] != DstOffsets[i+1]) {
413  TInt CurrNode = DstCol2[DstOffsets[i]];
414  TInt ThreadOffset = DstIdOffsets[i];
415  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
416  TInt CurrCount = 1;
417  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
418  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
419  if (j < DstOffsets[i+1]) {
420  CurrNode = DstCol2[j];
421  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
422  CurrCount++;
423  }
424  }
425  }
426  }
427  }
428 
429  // double endNode = omp_get_wtime();
430  // printf("Node time = %f\n", endNode-endSort);
431 
432  TIntTrV Nodes;
433  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
434 
435  // double endNodeResize = omp_get_wtime();
436  // printf("(NodeResize time = %f)\n", endNodeResize-endNode);
437 
438  TInt i = 0, j = 0;
439  while (i < TotalSrcNodes && j < TotalDstNodes) {
440  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
441  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
442  i++;
443  j++;
444  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
445  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
446  i++;
447  } else {
448  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
449  j++;
450  }
451  }
452  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
453  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
454 
455  // double endMerge = omp_get_wtime();
456  // printf("Merge time = %f\n", endMerge-endNode);
457 
458  TInt NumNodes = Nodes.Len();
459  // printf("NumNodes = %d\n", NumNodes.Val);
460 
461  PGraphMP Graph = TNGraphMP::New(NumNodes, NumRows);
462  NumThreads = 1;
463  int Delta = (NumNodes+NumThreads-1)/NumThreads;
464 
465  TVec<TIntV> InVV(NumNodes);
466  TVec<TIntV> OutVV(NumNodes);
467 
468  omp_set_num_threads(NumThreads);
469  #pragma omp parallel for schedule(static,Delta)
470  for (int m = 0; m < NumNodes; m++) {
471  //double startTr = omp_get_wtime();
472  //TIntV OutV, InV;
473  TInt n, i, j;
474  Nodes[m].GetVal(n, i, j);
475  if (i >= 0) {
476  TInt Offset = SrcNodeIds[i].GetVal2();
477  TInt Sz = DstCol1.Len()-Offset;
478  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
479  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
480  OutVV[m].Reserve(Sz);
481  }
482  if (j >= 0) {
483  TInt Offset = DstNodeIds[j].GetVal2();
484  TInt Sz = SrcCol2.Len()-Offset;
485  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
486  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
487  InVV[m].Reserve(Sz);
488  }
489  //double endTr = omp_get_wtime();
490  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
491  }
492 
493  // double endAlloc = omp_get_wtime();
494  // printf("Alloc time = %f\n", endAlloc-endMerge);
495 
496  NumThreads = omp_get_max_threads();
497  Delta = (NumNodes+NumThreads-1)/(10*NumThreads);
498  omp_set_num_threads(NumThreads);
499  #pragma omp parallel for schedule(dynamic)
500  for (int m = 0; m < NumNodes; m++) {
501  //double startTr = omp_get_wtime();
502  //TIntV OutV, InV;
503  TInt n, i, j;
504  Nodes[m].GetVal(n, i, j);
505  if (i >= 0) {
506  TInt Offset = SrcNodeIds[i].GetVal2();
507  TInt Sz = DstCol1.Len()-Offset;
508  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
509  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
510  OutVV[m].CopyUniqueFrom(DstCol1, Offset, Sz);
511  }
512  if (j >= 0) {
513  TInt Offset = DstNodeIds[j].GetVal2();
514  TInt Sz = SrcCol2.Len()-Offset;
515  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
516  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
517  InVV[m].CopyUniqueFrom(SrcCol2, Offset, Sz);
518  }
519  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
520  //double endTr = omp_get_wtime();
521  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
522  }
523  Graph->SetNodes(NumNodes);
524 
525  // double endAdd = omp_get_wtime();
526  // printf("Add time = %f\n", endAdd-endAlloc);
527 
528  return Graph;
529 }
530 
532 #ifdef GCC_ATOMIC
533 template<class PGraphMP>
534 PGraphMP ToGraphMP3(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
535  PNGraphMP Graph;
536  int MaxThreads = omp_get_max_threads();
537  int Length, Threads, Delta, Nodes, Last;
538  uint64_t NumNodesEst;
539  TInt SrcColIdx, DstColIdx;
540  TIntV InVec, OutVec;
541 
542  SrcColIdx = Table->GetColIdx(SrcCol);
543  DstColIdx = Table->GetColIdx(DstCol);
544  const TAttrType NodeType = Table->GetColType(SrcCol);
545  Assert(NodeType == Table->GetColType(DstCol));
546 
547 
548  /* Estimate number of nodes in the graph */
549  int NumRows = Table->Next.Len();
550  double Load = 10;
551  int sz = NumRows / Load;
552  int *buckets = (int *)malloc(sz * sizeof(int));
553 
554  #pragma omp parallel for
555  for (int i = 0; i < sz; i++)
556  buckets[i] = 0;
557 
558  if (NodeType == atInt) {
559  #pragma omp parallel for
560  for (int i = 0; i < NumRows; i++) {
561  int vert = Table->IntCols[DstColIdx][i];
562  buckets[vert % sz] = 1;
563  }
564  }
565  else if (NodeType == atStr ) {
566  #pragma omp parallel for
567  for (int i = 0; i < NumRows; i++) {
568  int vert = (Table->StrColMaps)[DstColIdx][i];
569  buckets[vert % sz] = 1;
570  }
571  }
572  int cnt = 0;
573  #pragma omp parallel for reduction(+:cnt)
574  for (int i = 0; i < sz; i++) {
575  if (buckets[i] == 0)
576  cnt += 1;
577  }
578 
579  NumNodesEst = sz * log ((double)sz / cnt);
580  free (buckets);
581 
582  /* Until we correctly estimate the number of nodes */
583  while (1)
584  {
585  Graph = TNGraphMP::New(NumNodesEst, 100);
586 
587  Length = Graph->Reserved();
588  Threads = MaxThreads/2;
589  Delta = (Length + Threads - 1) / Threads;
590 
591  OutVec.Gen(Length);
592  InVec.Gen(Length);
593 
594  /* build the node hash table, count the size of edge lists */
595  Last = NumRows;
596  Nodes = 0;
597  omp_set_num_threads(Threads);
598  #pragma omp parallel for schedule(static, Delta)
599  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
600  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
601  /* need bigger hash table */
602  continue;
603  }
604 
605  TInt SVal, DVal;
606  if (NodeType == atInt) {
607  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
608  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
609  }
610  else if (NodeType == atStr ) {
611  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
612  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
613  }
614  int SrcIdx = abs((SVal.GetPrimHashCd()) % Length);
615  if (!Graph->AddOutEdge1(SrcIdx, SVal, DVal)) {
616  #pragma omp critical
617  {
618  Nodes++;
619  }
620  }
621  __sync_fetch_and_add(&OutVec[SrcIdx].Val, 1);
622 
623  int DstIdx = abs((DVal.GetPrimHashCd()) % Length);
624  if (!Graph->AddInEdge1(DstIdx, SVal, DVal)) {
625  #pragma omp critical
626  {
627  Nodes++;
628  }
629  }
630  __sync_fetch_and_add(&InVec[DstIdx].Val, 1);
631 
632  }
633  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
634  /* We need to double our num nodes estimate */
635  Graph.Clr();
636  InVec.Clr();
637  OutVec.Clr();
638  NumNodesEst *= 2;
639  }
640  else {
641  break;
642  }
643  }
644 
645  Graph->SetNodes(Nodes);
646 
647  uint Edges = 0;
648  for (int i = 0; i < Length; i++) {
649  Edges += OutVec[i] + InVec[i];
650  }
651 
652  for (int Idx = 0; Idx < Length; Idx++) {
653  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
654  Graph->ReserveNodeDegs(Idx, InVec[Idx], OutVec[Idx]);
655  }
656  }
657 
658  /* assign edges */
659  Length = Graph->Reserved();
660  Threads = MaxThreads;
661  Delta = (Length + Threads - 1) / Threads;
662 
663  omp_set_num_threads(Threads);
664  #pragma omp parallel for schedule(static,Delta)
665  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
666  TInt SVal, DVal;
667  if (NodeType == atInt) {
668  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
669  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
670  }
671  else if (NodeType == atStr) {
672  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
673  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
674  }
675 
676  Graph->AddOutEdge2(SVal, DVal);
677  Graph->AddInEdge2(SVal, DVal);
678  }
679 
680  /* sort edges */
681  Length = Graph->Reserved();
682  Threads = MaxThreads*2;
683  Delta = (Length + Threads - 1) / Threads;
684 
685  omp_set_num_threads(Threads);
686  #pragma omp parallel for schedule(dynamic)
687  for (int Idx = 0; Idx < Length; Idx++) {
688  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
689  Graph->SortEdges(Idx, InVec[Idx], OutVec[Idx]);
690  }
691  }
692 
693  return Graph;
694 }
695 
697 template<class PGraphMP>
698 inline PGraphMP ToNetworkMP(PTable Table,
699  const TStr& SrcCol, const TStr& DstCol,
700  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
701  TAttrAggr AggrPolicy) {
703 
705  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
706  const TInt DstColIdx = Table->GetColIdx(DstCol);
707  const TInt NumRows = Table->GetNumValidRows();
708 
709  const TAttrType NodeType = Table->GetColType(SrcCol);
710  Assert(NodeType == Table->GetColType(DstCol));
711 
712 
713  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
714 
715  THash<TInt, TStrIntVH> NodeIntAttrs;
716  THash<TInt, TStrFltVH> NodeFltAttrs;
717  THash<TInt, TStrStrVH> NodeStrAttrs;
718 
719  #pragma omp parallel sections num_threads(4)
720  {
721  #pragma omp section
722  { SrcCol1.Reserve(NumRows, NumRows); }
723  #pragma omp section
724  { EdgeCol1.Reserve(NumRows, NumRows); }
725  #pragma omp section
726  { DstCol2.Reserve(NumRows, NumRows); }
727  #pragma omp section
728  { EdgeCol2.Reserve(NumRows, NumRows); }
729  }
731 
733  TIntPrV Partitions;
734  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
735  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
736 
737  // double endPartition = omp_get_wtime();
738  // printf("Partition time = %f\n", endPartition-endResize);
739 
740  omp_set_num_threads(omp_get_max_threads());
741  if (NodeType == atInt) {
742  #pragma omp parallel for schedule(static)
743  for (int i = 0; i < Partitions.Len(); i++) {
744  TRowIterator RowI(Partitions[i].GetVal1(), Table());
745  TRowIterator EndI(Partitions[i].GetVal2(), Table());
746  while (RowI < EndI) {
747  TInt RowId = RowI.GetRowIdx();
748  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
749  EdgeCol1[RowId] = RowId;
750  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
751  EdgeCol2[RowId] = RowId;
752  RowI++;
753  }
754  }
755  }
756  else if (NodeType == atStr) {
757  #pragma omp parallel for schedule(static)
758  for (int i = 0; i < Partitions.Len(); i++) {
759  TRowIterator RowI(Partitions[i].GetVal1(), Table());
760  TRowIterator EndI(Partitions[i].GetVal2(), Table());
761  while (RowI < EndI) {
762  TInt RowId = RowI.GetRowIdx();
763  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
764  EdgeCol1[RowId] = RowId;
765  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
766  EdgeCol2[RowId] = RowId;
767  RowI++;
768  }
769  }
770  }
772 
773  Sw->Start(TStopwatch::Sort);
774  omp_set_num_threads(omp_get_max_threads());
775  #pragma omp parallel
776  {
777  #pragma omp single nowait
778  {
779  #ifndef GLib_WIN32
780  #pragma omp task untied shared(SrcCol1, EdgeCol1)
781  #endif
782  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
783  }
784  #pragma omp single nowait
785  {
786  #ifndef GLib_WIN32
787  #pragma omp task untied shared(EdgeCol2, DstCol2)
788  #endif
789  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
790  }
791  #ifndef GLib_WIN32
792  #pragma omp taskwait
793  #endif
794  }
795  Sw->Stop(TStopwatch::Sort);
796 
798  TInt NumThreads = omp_get_max_threads();
799  TInt PartSize = (NumRows/NumThreads);
800 
801  // Find the offset of all partitions, each of which contains a list of rows.
802  // Nodes from same sources or destinations are ensured to be kept within same partition.
803  TIntV SrcOffsets, DstOffsets;
804  SrcOffsets.Add(0);
805  for (TInt i = 1; i < NumThreads; i++) {
806  TInt CurrOffset = i * PartSize;
807  while (CurrOffset < (i+1) * PartSize &&
808  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
809  // ensure that rows from the same sources are grouped together
810  CurrOffset++;
811  }
812  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
813  }
814  SrcOffsets.Add(NumRows);
815 
816  DstOffsets.Add(0);
817  for (TInt i = 1; i < NumThreads; i++) {
818  TInt CurrOffset = i * PartSize;
819  while (CurrOffset < (i+1) * PartSize &&
820  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
821  // ensure that rows to the same destinations are grouped together
822  CurrOffset++;
823  }
824  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
825  }
826  DstOffsets.Add(NumRows);
827 
828  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
829  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
830 
831  // count the number of source nodes and destination nodes in each partition
832  TIntV SrcNodeCounts, DstNodeCounts;
833  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
834  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
835 
836  #pragma omp parallel for schedule(dynamic)
837  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
838  if (t < SrcPartCnt) {
839  TInt i = t;
840  if (SrcOffsets[i] != SrcOffsets[i+1]) {
841  SrcNodeCounts[i] = 1;
842  TInt CurrNode = SrcCol1[SrcOffsets[i]];
843  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
844  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
845  if (j < SrcOffsets[i+1]) {
846  SrcNodeCounts[i]++;
847  CurrNode = SrcCol1[j];
848  }
849  }
850  }
851  } else {
852  TInt i = t - SrcPartCnt;
853  if (DstOffsets[i] != DstOffsets[i+1]) {
854  DstNodeCounts[i] = 1;
855  TInt CurrNode = DstCol2[DstOffsets[i]];
856  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
857  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
858  if (j < DstOffsets[i+1]) {
859  DstNodeCounts[i]++;
860  CurrNode = DstCol2[j];
861  }
862  }
863  }
864  }
865  }
866 
867  TInt TotalSrcNodes = 0;
868  TIntV SrcIdOffsets;
869  for (int i = 0; i < SrcPartCnt; i++) {
870  SrcIdOffsets.Add(TotalSrcNodes);
871  TotalSrcNodes += SrcNodeCounts[i];
872  }
873 
874  TInt TotalDstNodes = 0;
875  TIntV DstIdOffsets;
876  for (int i = 0; i < DstPartCnt; i++) {
877  DstIdOffsets.Add(TotalDstNodes);
878  TotalDstNodes += DstNodeCounts[i];
879  }
880 
881  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
882 
883  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
884  TIntPrV SrcNodeIds, DstNodeIds;
885  #pragma omp parallel sections
886  {
887  #pragma omp section
888  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
889  #pragma omp section
890  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
891  }
892 
893  // Find the starting offset of each node (in both src and dst)
894  #pragma omp parallel for schedule(dynamic)
895  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
896  if (t < SrcPartCnt) {
897  TInt i = t;
898  if (SrcOffsets[i] != SrcOffsets[i+1]) {
899  TInt CurrNode = SrcCol1[SrcOffsets[i]];
900  TInt ThreadOffset = SrcIdOffsets[i];
901  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
902  TInt CurrCount = 1;
903  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
904  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
905  if (j < SrcOffsets[i+1]) {
906  CurrNode = SrcCol1[j];
907  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
908  CurrCount++;
909  }
910  }
911  }
912  } else {
913  TInt i = t - SrcPartCnt;
914  if (DstOffsets[i] != DstOffsets[i+1]) {
915  TInt CurrNode = DstCol2[DstOffsets[i]];
916  TInt ThreadOffset = DstIdOffsets[i];
917  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
918  TInt CurrCount = 1;
919  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
920  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
921  if (j < DstOffsets[i+1]) {
922  CurrNode = DstCol2[j];
923  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
924  CurrCount++;
925  }
926  }
927  }
928  }
929  }
930  Sw->Stop(TStopwatch::Group);
931 
933  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
934  TIntTrV Nodes;
935  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
936 
937  TInt i = 0, j = 0;
938  while (i < TotalSrcNodes && j < TotalDstNodes) {
939  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
940  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
941  i++;
942  j++;
943  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
944  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
945  i++;
946  } else {
947  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
948  j++;
949  }
950  }
951  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
952  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
954 
956  TInt NumNodes = Nodes.Len();
957  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
958 // NumThreads = omp_get_max_threads();
959 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
960 
961  TVec<TIntV> InVV(NumNodes);
962  TVec<TIntV> OutVV(NumNodes);
963 
964 // omp_set_num_threads(NumThreads);
965  #pragma omp parallel for schedule(static,100)
966  for (int m = 0; m < NumNodes; m++) {
967  //double startTr = omp_get_wtime();
968  //TIntV OutV, InV;
969  TInt n, i, j;
970  Nodes[m].GetVal(n, i, j);
971  if (i >= 0) {
972  TInt Offset = SrcNodeIds[i].GetVal2();
973  TInt Sz = EdgeCol1.Len()-Offset;
974  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
975  OutVV[m].Reserve(Sz);
976  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
977  }
978  if (j >= 0) {
979  TInt Offset = DstNodeIds[j].GetVal2();
980  TInt Sz = EdgeCol2.Len()-Offset;
981  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
982  InVV[m].Reserve(Sz);
983  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
984  }
985  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
986  }
987  Graph->SetNodes(NumNodes);
989 
991  omp_set_num_threads(omp_get_max_threads());
992  if (NodeType == atInt) {
993  #pragma omp parallel for schedule(static)
994  for (int i = 0; i < Partitions.Len(); i++) {
995  TRowIterator RowI(Partitions[i].GetVal1(), Table());
996  TRowIterator EndI(Partitions[i].GetVal2(), Table());
997  while (RowI < EndI) {
998  TInt RowId = RowI.GetRowIdx(); // EdgeId
999  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1000  TInt DstId = RowI.GetIntAttr(DstColIdx);
1001  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1002  RowI++;
1003  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1004  TStr ColName = EdgeAttrV[ea_i];
1005  TAttrType T = Table->GetColType(ColName);
1006  TInt Index = Table->GetColIdx(ColName);
1007  switch (T) {
1008  case atInt:
1009  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1010  break;
1011  case atFlt:
1012  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1013  break;
1014  case atStr:
1015  Graph->AddStrAttrDatE(RowId, Table->GetStrVal(Index, RowId), ColName);
1016  break;
1017  }
1018  }
1019  if ((Table->SrcNodeAttrV).Len() > 0) {
1020  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1021  }
1022 
1023  if ((Table->DstNodeAttrV).Len() > 0) {
1024  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1025  }
1026  }
1027  }
1028  }
1029  else if (NodeType == atStr) {
1030  #pragma omp parallel for schedule(static)
1031  for (int i = 0; i < Partitions.Len(); i++) {
1032  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1033  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1034  while (RowI < EndI) {
1035  TInt RowId = RowI.GetRowIdx(); // EdgeId
1036  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1037  TInt DstId = RowI.GetStrMapById(DstColIdx);
1038  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1039  RowI++;
1040  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1041  TStr ColName = EdgeAttrV[ea_i];
1042  TAttrType T = Table->GetColType(ColName);
1043  TInt Index = Table->GetColIdx(ColName);
1044  switch (T) {
1045  case atInt:
1046  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1047  break;
1048  case atFlt:
1049  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1050  break;
1051  case atStr:
1052  Graph->AddStrAttrDatE(RowId, Table->GetStrVal(Index, RowId), ColName);
1053  break;
1054  }
1055  }
1056  if ((Table->SrcNodeAttrV).Len() > 0) {
1057  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1058  }
1059 
1060  if ((Table->DstNodeAttrV).Len() > 0) {
1061  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1062  }
1063 
1064  }
1065  }
1066 
1067  }
1068 
1069  // aggregate node attributes and add to graph
1070  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
1071  for (typename PGraphMP::TObj::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
1072  TInt NId = NodeI.GetId();
1073  if (NodeIntAttrs.IsKey(NId)) {
1074  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
1075  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
1076  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
1077  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
1078  }
1079  }
1080  if (NodeFltAttrs.IsKey(NId)) {
1081  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
1082  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
1083  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
1084  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
1085  }
1086  }
1087  if (NodeStrAttrs.IsKey(NId)) {
1088  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
1089  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
1090  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
1091  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
1092  }
1093  }
1094  }
1095  }
1096 
1097 
1098  Graph->SetEdges(NumRows);
1100 
1101  // double endAdd = omp_get_wtime();
1102  // printf("Add time = %f\n", endAdd-endAlloc);
1103 
1104  return Graph;
1105 }
1106 
1108 template<class PGraphMP>
1109 PGraphMP ToNetworkMP(PTable Table,
1110  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1111 {
1112  TStrV V;
1113  return ToNetworkMP<PGraphMP>(Table, SrcCol, DstCol, V,AggrPolicy);
1114 }
1115 
1116 
1117 
1119 template<class PGraphMP>
1120 inline PGraphMP ToNetworkMP2(PTable Table,
1121  const TStr& SrcCol, const TStr& DstCol,
1122  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
1123  TAttrAggr AggrPolicy) {
1125 
1127  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1128  const TInt DstColIdx = Table->GetColIdx(DstCol);
1129  const TInt NumRows = Table->NumValidRows;
1130 
1131  const TAttrType NodeType = Table->GetColType(SrcCol);
1132  Assert(NodeType == Table->GetColType(DstCol));
1133 
1134 
1135 
1136  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1137 
1138  #pragma omp parallel sections num_threads(4)
1139  {
1140  #pragma omp section
1141  { SrcCol1.Reserve(NumRows, NumRows); }
1142  #pragma omp section
1143  { EdgeCol1.Reserve(NumRows, NumRows); }
1144  #pragma omp section
1145  { DstCol2.Reserve(NumRows, NumRows); }
1146  #pragma omp section
1147  { EdgeCol2.Reserve(NumRows, NumRows); }
1148  }
1151  TIntPrV Partitions;
1152 // int NThreads = omp_get_max_threads();
1153  const int NThreads = 40;
1154  Table->GetPartitionRanges(Partitions, NThreads);
1155  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1156 
1157  // double endPartition = omp_get_wtime();
1158  // printf("Partition time = %f\n", endPartition-endResize);
1159 
1160  if (NodeType == atInt) {
1161  #pragma omp parallel for schedule(static)
1162  for (int i = 0; i < Partitions.Len(); i++) {
1163  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1164  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1165  while (RowI < EndI) {
1166  TInt RowId = RowI.GetRowIdx();
1167  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1168  EdgeCol1[RowId] = RowId;
1169  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1170  EdgeCol2[RowId] = RowId;
1171  RowI++;
1172  }
1173  }
1174  }
1175  else if (NodeType == atStr) {
1176  #pragma omp parallel for schedule(static)
1177  for (int i = 0; i < Partitions.Len(); i++) {
1178  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1179  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1180  while (RowI < EndI) {
1181  TInt RowId = RowI.GetRowIdx();
1182  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1183  EdgeCol1[RowId] = RowId;
1184  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1185  EdgeCol2[RowId] = RowId;
1186  RowI++;
1187  }
1188  }
1189 
1190  }
1191 
1192 // printf("NumRows = %d\n", NumRows.Val);
1193 // printf("NThreads = %d\n", NThreads);
1194 // for (int i = 0; i < Partitions.Len(); i++) {
1195 // printf("Partition %d %d->%d\n", i, Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val);
1196 // }
1197  int Parts[NThreads+1];
1198  for (int i = 0; i < NThreads; i++) {
1199  Parts[i] = NumRows.Val / NThreads * i;
1200  }
1201  Parts[NThreads] = NumRows;
1202 // for (int i = 0; i < NThreads+1; i++) {
1203 // printf("Parts[%d] = %d\n", i, Parts[i]);
1204 // }
1206 
1207  Sw->Start(TStopwatch::Sort);
1208  TInt ExtremePoints[4][NThreads];
1209  omp_set_num_threads(omp_get_max_threads());
1210  #pragma omp parallel
1211  {
1212  #pragma omp for schedule(static) nowait
1213  for (int i = 0; i < NThreads; i++) {
1214  TInt StartPos = Parts[i];
1215  TInt EndPos = Parts[i+1]-1;
1216  // TODO: Handle empty partition
1217  TTable::QSortKeyVal(SrcCol1, EdgeCol1, StartPos, EndPos);
1218  ExtremePoints[0][i] = SrcCol1[StartPos];
1219  ExtremePoints[2][i] = SrcCol1[EndPos];
1220  }
1221  #pragma omp for schedule(static) nowait
1222  for (int i = 0; i < NThreads; i++) {
1223  TInt StartPos = Parts[i];
1224  TInt EndPos = Parts[i+1]-1;
1225  // TODO: Handle empty partition
1226  TTable::QSortKeyVal(DstCol2, EdgeCol2, StartPos, EndPos);
1227  ExtremePoints[1][i] = DstCol2[StartPos];
1228  ExtremePoints[3][i] = DstCol2[EndPos];
1229  }
1230  }
1231 // for (int i = 0; i < NThreads; i++) {
1232 // printf("ExtremePoints[%d] = %d-%d -> %d-%d\n", i, ExtremePoints[0][i].Val, ExtremePoints[1][i].Val, ExtremePoints[2][i].Val, ExtremePoints[3][i].Val);
1233 // }
1234 
1235  // find min points
1236  TInt MinId(INT_MAX);
1237  for (int j = 0; j < 2; j++) {
1238  for (int i = 0; i < NThreads; i++) {
1239  if (MinId > ExtremePoints[j][i]) { MinId = ExtremePoints[j][i]; }
1240  }
1241  }
1242  TInt MaxId(-1);
1243  for (int j = 2; j < 4; j++) {
1244  for (int i = 0; i < NThreads; i++) {
1245  if (MaxId < ExtremePoints[j][i]) { MaxId = ExtremePoints[j][i]; }
1246  }
1247  }
1248 // printf("MinId = %d\n", MinId.Val);
1249 // printf("MaxId = %d\n", MaxId.Val);
1250  Sw->Stop(TStopwatch::Sort);
1251 
1252  Sw->Start(TStopwatch::Group);
1253 // const int NumCollectors = omp_get_max_threads();
1254  const int NumCollectors = 20;
1255  int Range = MaxId.Val - MinId.Val;
1256  TIntV IdRanges(NumCollectors+1);
1257  for (int j = 0; j < NumCollectors; j++) {
1258  IdRanges[j] = MinId + Range/NumCollectors*j;
1259  }
1260  IdRanges[NumCollectors] = MaxId+1;
1261 // for (int i = 0; i < NumCollectors+1; i++) {
1262 // printf("IdRanges[%d] = %d\n", i, IdRanges[i].Val);
1263 // }
1264 
1265  int SrcOffsets[NThreads][NumCollectors+1];
1266  #pragma omp parallel for schedule(static)
1267  for (int i = 0; i < NThreads; i++) {
1268  int CollectorId = 0;
1269  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1270  while (SrcCol1[j] >= IdRanges[CollectorId]) {
1271  SrcOffsets[i][CollectorId++] = j;
1272  }
1273  }
1274  while (CollectorId <= NumCollectors) {
1275  SrcOffsets[i][CollectorId++] = Parts[i+1];
1276  }
1277  }
1278  int DstOffsets[NThreads][NumCollectors+1];
1279  #pragma omp parallel for schedule(static)
1280  for (int i = 0; i < NThreads; i++) {
1281  int CollectorId = 0;
1282  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1283  while (DstCol2[j] >= IdRanges[CollectorId]) {
1284  DstOffsets[i][CollectorId++] = j;
1285  }
1286  }
1287  while (CollectorId <= NumCollectors) {
1288  DstOffsets[i][CollectorId++] = Parts[i+1];
1289  }
1290  }
1291 // for (int i = 0; i < NThreads; i++) {
1292 // for (int j = 0; j < NumCollectors+1; j++) {
1293 // printf("SrcOffsets[%d][%d] = %d\n", i, j, SrcOffsets[i][j]);
1294 // }
1295 // }
1296 // for (int i = 0; i < NThreads; i++) {
1297 // for (int j = 0; j < NumCollectors+1; j++) {
1298 // printf("DstOffsets[%d][%d] = %d\n", i, j, DstOffsets[i][j]);
1299 // }
1300 // }
1301 
1302  TIntV SrcCollectorOffsets(NumCollectors+1);
1303  SrcCollectorOffsets[0] = 0;
1304  for (int k = 0; k < NumCollectors; k++) {
1305  int SumOffset = 0;
1306  for (int i = 0; i < NThreads; i++) {
1307  SumOffset += SrcOffsets[i][k+1] - SrcOffsets[i][k];
1308  }
1309  SrcCollectorOffsets[k+1] = SrcCollectorOffsets[k] + SumOffset;
1310  }
1311  TIntV DstCollectorOffsets(NumCollectors+1);
1312  DstCollectorOffsets[0] = 0;
1313  for (int k = 0; k < NumCollectors; k++) {
1314  int SumOffset = 0;
1315  for (int i = 0; i < NThreads; i++) {
1316  SumOffset += DstOffsets[i][k+1] - DstOffsets[i][k];
1317  }
1318  DstCollectorOffsets[k+1] = DstCollectorOffsets[k] + SumOffset;
1319  }
1320 // for (int i = 0; i < NumCollectors+1; i++) {
1321 // printf("SrcCollectorOffsets[%d] = %d\n", i, SrcCollectorOffsets[i].Val);
1322 // }
1323 // for (int i = 0; i < NumCollectors+1; i++) {
1324 // printf("DstCollectorOffsets[%d] = %d\n", i, DstCollectorOffsets[i].Val);
1325 // }
1326 
1327  TIntV SrcCol3, EdgeCol3, EdgeCol4, DstCol4;
1328  #pragma omp parallel sections num_threads(4)
1329  {
1330  #pragma omp section
1331  { SrcCol3.Reserve(NumRows, NumRows); }
1332  #pragma omp section
1333  { EdgeCol3.Reserve(NumRows, NumRows); }
1334  #pragma omp section
1335  { DstCol4.Reserve(NumRows, NumRows); }
1336  #pragma omp section
1337  { EdgeCol4.Reserve(NumRows, NumRows); }
1338  }
1339 
1340  TIntV SrcNodeCounts(NumCollectors), DstNodeCounts(NumCollectors);
1341  #pragma omp parallel for schedule(static)
1342  for (int k = 0; k < NumCollectors; k++) {
1343  int ind = SrcCollectorOffsets[k];
1344  for (int i = 0; i < NThreads; i++) {
1345  for (int j = SrcOffsets[i][k]; j < SrcOffsets[i][k+1]; j++) {
1346  SrcCol3[ind] = SrcCol1[j];
1347  EdgeCol3[ind] = EdgeCol1[j];
1348  ind++;
1349  }
1350  }
1351  TTable::QSortKeyVal(SrcCol3, EdgeCol3, SrcCollectorOffsets[k], SrcCollectorOffsets[k+1]-1);
1352  int SrcCount = 0;
1353  if (SrcCollectorOffsets[k+1] > SrcCollectorOffsets[k]) {
1354  SrcCount = 1;
1355  for (int j = SrcCollectorOffsets[k]+1; j < SrcCollectorOffsets[k+1]; j++) {
1356  if (SrcCol3[j] != SrcCol3[j-1]) { SrcCount++; }
1357  }
1358  }
1359  SrcNodeCounts[k] = SrcCount;
1360 
1361  ind = DstCollectorOffsets[k];
1362  for (int i = 0; i < NThreads; i++) {
1363  for (int j = DstOffsets[i][k]; j < DstOffsets[i][k+1]; j++) {
1364  DstCol4[ind] = DstCol2[j];
1365  EdgeCol4[ind] = EdgeCol2[j];
1366  ind++;
1367  }
1368  }
1369  TTable::QSortKeyVal(DstCol4, EdgeCol4, DstCollectorOffsets[k], DstCollectorOffsets[k+1]-1);
1370  int DstCount = 0;
1371  if (DstCollectorOffsets[k+1] > DstCollectorOffsets[k]) {
1372  DstCount = 1;
1373  for (int j = DstCollectorOffsets[k]+1; j < DstCollectorOffsets[k+1]; j++) {
1374  if (DstCol4[j] != DstCol4[j-1]) { DstCount++; }
1375  }
1376  }
1377  DstNodeCounts[k] = DstCount;
1378  }
1379 
1380  TInt TotalSrcNodes = 0;
1381  TIntV SrcIdOffsets;
1382  for (int i = 0; i < NumCollectors; i++) {
1383  SrcIdOffsets.Add(TotalSrcNodes);
1384  TotalSrcNodes += SrcNodeCounts[i];
1385  }
1386 
1387 // printf("Sorted = %d - %d\n", SrcCol3.IsSorted(), DstCol4.IsSorted());
1388 // for (int i = 0; i < NumRows-1; i++) {
1389 // if (SrcCol3[i] > SrcCol3[i+1]) { printf("i=%d: %d %d\n", i, SrcCol3[i].Val, SrcCol3[i+1].Val); }
1390 // }
1391 // for (int i = 0; i < NumRows-1; i++) {
1392 // if (DstCol4[i] > DstCol4[i+1]) { printf("i=%d: %d %d\n", i, DstCol4[i].Val, DstCol4[i+1].Val); }
1393 // }
1394 
1395  TInt TotalDstNodes = 0;
1396  TIntV DstIdOffsets;
1397  for (int i = 0; i < NumCollectors; i++) {
1398  DstIdOffsets.Add(TotalDstNodes);
1399  TotalDstNodes += DstNodeCounts[i];
1400  }
1401 
1402  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1403  TIntPrV SrcNodeIds, DstNodeIds;
1404  #pragma omp parallel sections
1405  {
1406  #pragma omp section
1407  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1408  #pragma omp section
1409  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1410  }
1411 
1412  // Find the starting offset of each node (in both src and dst)
1413  #pragma omp parallel for schedule(dynamic)
1414  for (int t = 0; t < 2*NumCollectors; t++) {
1415  if (t < NumCollectors) {
1416  TInt i = t;
1417  if (SrcCollectorOffsets[i] < SrcCollectorOffsets[i+1]) {
1418  TInt CurrNode = SrcCol3[SrcCollectorOffsets[i]];
1419  TInt ThreadOffset = SrcIdOffsets[i];
1420  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcCollectorOffsets[i]);
1421  TInt CurrCount = 1;
1422  for (TInt j = SrcCollectorOffsets[i]+1; j < SrcCollectorOffsets[i+1]; j++) {
1423  while (j < SrcCollectorOffsets[i+1] && SrcCol3[j] == CurrNode) { j++; }
1424  if (j < SrcCollectorOffsets[i+1]) {
1425  CurrNode = SrcCol3[j];
1426  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1427  CurrCount++;
1428  }
1429  }
1430  }
1431  } else {
1432  TInt i = t - NumCollectors;
1433  if (DstCollectorOffsets[i] < DstCollectorOffsets[i+1]) {
1434  TInt CurrNode = DstCol4[DstCollectorOffsets[i]];
1435  TInt ThreadOffset = DstIdOffsets[i];
1436  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstCollectorOffsets[i]);
1437  TInt CurrCount = 1;
1438  for (TInt j = DstCollectorOffsets[i]+1; j < DstCollectorOffsets[i+1]; j++) {
1439  while (j < DstCollectorOffsets[i+1] && DstCol4[j] == CurrNode) { j++; }
1440  if (j < DstCollectorOffsets[i+1]) {
1441  CurrNode = DstCol4[j];
1442  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1443  CurrCount++;
1444  }
1445  }
1446  }
1447  }
1448  }
1449  Sw->Stop(TStopwatch::Group);
1450 
1452  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1453  TIntTrV Nodes;
1454  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1455 
1456  TInt i = 0, j = 0;
1457  while (i < TotalSrcNodes && j < TotalDstNodes) {
1458  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1459  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1460  i++;
1461  j++;
1462  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1463  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1464  i++;
1465  } else {
1466  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1467  j++;
1468  }
1469  }
1470  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1471  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1473 
1475  TInt NumNodes = Nodes.Len();
1476  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1477 // NumThreads = omp_get_max_threads();
1478 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1479 
1480  TVec<TIntV> InVV(NumNodes);
1481  TVec<TIntV> OutVV(NumNodes);
1482 
1483 // omp_set_num_threads(NumThreads);
1484  #pragma omp parallel for schedule(static,100)
1485  for (int m = 0; m < NumNodes; m++) {
1486  //double startTr = omp_get_wtime();
1487  //TIntV OutV, InV;
1488  TInt n, i, j;
1489  Nodes[m].GetVal(n, i, j);
1490  if (i >= 0) {
1491  TInt Offset = SrcNodeIds[i].GetVal2();
1492  TInt Sz = EdgeCol3.Len()-Offset;
1493  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1494  OutVV[m].Reserve(Sz);
1495  OutVV[m].CopyUniqueFrom(EdgeCol3, Offset, Sz);
1496  }
1497  if (j >= 0) {
1498  TInt Offset = DstNodeIds[j].GetVal2();
1499  TInt Sz = EdgeCol4.Len()-Offset;
1500  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1501  InVV[m].Reserve(Sz);
1502  InVV[m].CopyUniqueFrom(EdgeCol4, Offset, Sz);
1503  }
1504  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1505  }
1506  Graph->SetNodes(NumNodes);
1508 
1510  omp_set_num_threads(omp_get_max_threads());
1511  if (NodeType == atInt) {
1512  #pragma omp parallel for schedule(static)
1513  for (int i = 0; i < Partitions.Len(); i++) {
1514  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1515  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1516  while (RowI < EndI) {
1517  TInt RowId = RowI.GetRowIdx(); // EdgeId
1518  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1519  TInt DstId = RowI.GetIntAttr(DstColIdx);
1520  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1521  RowI++;
1522  }
1523  }
1524  }
1525  else if (NodeType == atStr) {
1526  #pragma omp parallel for schedule(static)
1527  for (int i = 0; i < Partitions.Len(); i++) {
1528  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1529  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1530  while (RowI < EndI) {
1531  TInt RowId = RowI.GetRowIdx(); // EdgeId
1532  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1533  TInt DstId = RowI.GetStrMapById(DstColIdx);
1534  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1535  RowI++;
1536  }
1537  }
1538  }
1539  Graph->SetEdges(NumRows);
1541 
1542  // double endAdd = omp_get_wtime();
1543  // printf("Add time = %f\n", endAdd-endAlloc);
1544 
1545  return Graph;
1546 }
1548 template<class PGraphMP>
1549 PGraphMP ToNetworkMP2(PTable Table,
1550  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1551 {
1552  TStrV V;
1553  return ToNetworkMP2<PGraphMP>(Table, SrcCol, DstCol, V, V, V, AggrPolicy);
1554 }
1555 #endif // GCC_ATOMIC
1556 
1557 
1559 int LoadModeNetToNet(PMMNet Graph, const TStr& Name, PTable Table, const TStr& NCol,
1560  TStrV& NodeAttrV);
1562 int LoadMode(TModeNet& Graph, PTable Table, const TStr& NCol,
1563  TStrV& NodeAttrV);
1565 int LoadCrossNetToNet(PMMNet Graph, const TStr& Mode1, const TStr& Mode2, const TStr& CrossName,
1566  PTable Table, const TStr& SrcCol, const TStr& DstCol, TStrV& EdgeAttrV);
1568 int LoadCrossNet(TCrossNet& Graph, PTable Table, const TStr& SrcCol, const TStr& DstCol,
1569  TStrV& EdgeAttrV);
1570 
1571 
1573 template<class PGraph>
1574 PGraph ToNetwork(PTable Table,
1575  const TStr& SrcCol, const TStr& DstCol,
1576  TStrV& EdgeAttrV,
1577  TAttrAggr AggrPolicy) {
1578  PGraph Graph = PGraph::TObj::New();
1579 
1580  const TAttrType NodeType = Table->GetColType(SrcCol);
1581  Assert(NodeType == Table->GetColType(DstCol));
1582  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1583  const TInt DstColIdx = Table->GetColIdx(DstCol);
1584 
1585  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
1586  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
1587  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
1588 
1589  // node values - i.e. the unique values of src/dst col
1590  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
1591  THash<TFlt, TInt> FltNodeVals;
1592 
1593  // make single pass over all rows in the table
1594  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1595  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1596  continue;
1597  }
1598 
1599  // add src and dst nodes to graph if they are not seen earlier
1600  TInt SVal, DVal;
1601  if (NodeType == atFlt) {
1602  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1603  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
1604  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1605  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
1606  }
1607  else if (NodeType == atInt || NodeType == atStr) {
1608  if (NodeType == atInt) {
1609  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
1610  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
1611  }
1612  else {
1613  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
1614  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
1615  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
1616  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
1617  }
1618  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
1619  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
1620  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
1621  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
1622  }
1623 
1624  // add edge and edge attributes
1625  Graph->AddEdge(SVal, DVal, CurrRowIdx);
1626 
1627  // Aggregate edge attributes and add to graph
1628  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1629  TStr ColName = EdgeAttrV[i];
1630  TAttrType T = Table->GetColType(ColName);
1631  TInt Index = Table->GetColIdx(ColName);
1632  switch (T) {
1633  case atInt:
1634  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1635  break;
1636  case atFlt:
1637  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1638  break;
1639  case atStr:
1640  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
1641  break;
1642  }
1643  }
1644  }
1645  return Graph;
1646 
1647 }
1648 
1649 
1650 
1652 template<class PGraphMP>
1653 inline PGraphMP ToNetworkMP(PTable Table,
1654  const TStr& SrcCol, const TStr& DstCol,
1655  TStrV& EdgeAttrV,
1656  TAttrAggr AggrPolicy) {
1658 
1660  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1661  const TInt DstColIdx = Table->GetColIdx(DstCol);
1662  const TInt NumRows = Table->GetNumValidRows();
1663 
1664  const TAttrType NodeType = Table->GetColType(SrcCol);
1665  Assert(NodeType == Table->GetColType(DstCol));
1666 
1667  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1668 
1669  THash<TInt, TStrIntVH> NodeIntAttrs;
1670  THash<TInt, TStrFltVH> NodeFltAttrs;
1671  THash<TInt, TStrStrVH> NodeStrAttrs;
1672 
1673  #pragma omp parallel sections num_threads(4)
1674  {
1675  #pragma omp section
1676  { SrcCol1.Reserve(NumRows, NumRows); }
1677  #pragma omp section
1678  { EdgeCol1.Reserve(NumRows, NumRows); }
1679  #pragma omp section
1680  { DstCol2.Reserve(NumRows, NumRows); }
1681  #pragma omp section
1682  { EdgeCol2.Reserve(NumRows, NumRows); }
1683  }
1685 
1687  TIntPrV Partitions;
1688  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
1689  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1690 
1691 
1692  // double endPartition = omp_get_wtime();
1693  // printf("Partition time = %f\n", endPartition-endResize);
1694 
1695  omp_set_num_threads(omp_get_max_threads());
1696  if (NodeType == atInt) {
1697  #pragma omp parallel for schedule(static)
1698  for (int i = 0; i < Partitions.Len(); i++) {
1699  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1700  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1701  while (RowI < EndI) {
1702  TInt RowId = RowI.GetRowIdx();
1703  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1704  EdgeCol1[RowId] = RowId;
1705  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1706  EdgeCol2[RowId] = RowId;
1707  RowI++;
1708  }
1709  }
1710  }
1711  else if (NodeType == atStr) {
1712  #pragma omp parallel for schedule(static)
1713  for (int i = 0; i < Partitions.Len(); i++) {
1714  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1715  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1716  while (RowI < EndI) {
1717  TInt RowId = RowI.GetRowIdx();
1718  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1719  EdgeCol1[RowId] = RowId;
1720  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1721  EdgeCol2[RowId] = RowId;
1722  RowI++;
1723  }
1724  }
1725  }
1727 
1728  Sw->Start(TStopwatch::Sort);
1729  omp_set_num_threads(omp_get_max_threads());
1730  #pragma omp parallel
1731  {
1732  #pragma omp single nowait
1733  {
1734  #ifndef GLib_WIN32
1735  #pragma omp task untied shared(SrcCol1, EdgeCol1)
1736  #endif
1737  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
1738  }
1739  #pragma omp single nowait
1740  {
1741  #ifndef GLib_WIN32
1742  #pragma omp task untied shared(EdgeCol2, DstCol2)
1743  #endif
1744  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
1745  }
1746  #ifndef GLib_WIN32
1747  #pragma omp taskwait
1748  #endif
1749  }
1750  Sw->Stop(TStopwatch::Sort);
1751 
1752  Sw->Start(TStopwatch::Group);
1753  TInt NumThreads = omp_get_max_threads();
1754  TInt PartSize = (NumRows/NumThreads);
1755 
1756  // Find the offset of all partitions, each of which contains a list of rows.
1757  // Nodes from same sources or destinations are ensured to be kept within same partition.
1758  TIntV SrcOffsets, DstOffsets;
1759  SrcOffsets.Add(0);
1760  for (TInt i = 1; i < NumThreads; i++) {
1761  TInt CurrOffset = i * PartSize;
1762  while (CurrOffset < (i+1) * PartSize &&
1763  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
1764  // ensure that rows from the same sources are grouped together
1765  CurrOffset++;
1766  }
1767  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
1768  }
1769  SrcOffsets.Add(NumRows);
1770 
1771  DstOffsets.Add(0);
1772  for (TInt i = 1; i < NumThreads; i++) {
1773  TInt CurrOffset = i * PartSize;
1774  while (CurrOffset < (i+1) * PartSize &&
1775  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
1776  // ensure that rows to the same destinations are grouped together
1777  CurrOffset++;
1778  }
1779  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
1780  }
1781  DstOffsets.Add(NumRows);
1782 
1783  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
1784  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
1785 
1786  // count the number of source nodes and destination nodes in each partition
1787  TIntV SrcNodeCounts, DstNodeCounts;
1788  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
1789  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
1790 
1791  #pragma omp parallel for schedule(dynamic)
1792  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1793  if (t < SrcPartCnt) {
1794  TInt i = t;
1795  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1796  SrcNodeCounts[i] = 1;
1797  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1798  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1799  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1800  if (j < SrcOffsets[i+1]) {
1801  SrcNodeCounts[i]++;
1802  CurrNode = SrcCol1[j];
1803  }
1804  }
1805  }
1806  } else {
1807  TInt i = t - SrcPartCnt;
1808  if (DstOffsets[i] != DstOffsets[i+1]) {
1809  DstNodeCounts[i] = 1;
1810  TInt CurrNode = DstCol2[DstOffsets[i]];
1811  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1812  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1813  if (j < DstOffsets[i+1]) {
1814  DstNodeCounts[i]++;
1815  CurrNode = DstCol2[j];
1816  }
1817  }
1818  }
1819  }
1820  }
1821 
1822  TInt TotalSrcNodes = 0;
1823  TIntV SrcIdOffsets;
1824  for (int i = 0; i < SrcPartCnt; i++) {
1825  SrcIdOffsets.Add(TotalSrcNodes);
1826  TotalSrcNodes += SrcNodeCounts[i];
1827  }
1828 
1829  TInt TotalDstNodes = 0;
1830  TIntV DstIdOffsets;
1831  for (int i = 0; i < DstPartCnt; i++) {
1832  DstIdOffsets.Add(TotalDstNodes);
1833  TotalDstNodes += DstNodeCounts[i];
1834  }
1835 
1836  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
1837 
1838  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1839  TIntPrV SrcNodeIds, DstNodeIds;
1840  #pragma omp parallel sections
1841  {
1842  #pragma omp section
1843  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1844  #pragma omp section
1845  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1846  }
1847 
1848  // Find the starting offset of each node (in both src and dst)
1849  #pragma omp parallel for schedule(dynamic)
1850  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1851  if (t < SrcPartCnt) {
1852  TInt i = t;
1853  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1854  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1855  TInt ThreadOffset = SrcIdOffsets[i];
1856  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
1857  TInt CurrCount = 1;
1858  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1859  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1860  if (j < SrcOffsets[i+1]) {
1861  CurrNode = SrcCol1[j];
1862  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1863  CurrCount++;
1864  }
1865  }
1866  }
1867  } else {
1868  TInt i = t - SrcPartCnt;
1869  if (DstOffsets[i] != DstOffsets[i+1]) {
1870  TInt CurrNode = DstCol2[DstOffsets[i]];
1871  TInt ThreadOffset = DstIdOffsets[i];
1872  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
1873  TInt CurrCount = 1;
1874  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1875  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1876  if (j < DstOffsets[i+1]) {
1877  CurrNode = DstCol2[j];
1878  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1879  CurrCount++;
1880  }
1881  }
1882  }
1883  }
1884  }
1885  Sw->Stop(TStopwatch::Group);
1886 
1888  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1889  TIntTrV Nodes;
1890  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1891 
1892  TInt i = 0, j = 0;
1893  while (i < TotalSrcNodes && j < TotalDstNodes) {
1894  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1895  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1896  i++;
1897  j++;
1898  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1899  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1900  i++;
1901  } else {
1902  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1903  j++;
1904  }
1905  }
1906  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1907  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1909 
1911  TInt NumNodes = Nodes.Len();
1912  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1913 // NumThreads = omp_get_max_threads();
1914 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1915 
1916  TVec<TIntV> InVV(NumNodes);
1917  TVec<TIntV> OutVV(NumNodes);
1918 
1919 // omp_set_num_threads(NumThreads);
1920  #pragma omp parallel for schedule(static,100)
1921  for (int m = 0; m < NumNodes; m++) {
1922  //double startTr = omp_get_wtime();
1923  //TIntV OutV, InV;
1924  TInt n, i, j;
1925  Nodes[m].GetVal(n, i, j);
1926  if (i >= 0) {
1927  TInt Offset = SrcNodeIds[i].GetVal2();
1928  TInt Sz = EdgeCol1.Len()-Offset;
1929  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1930  OutVV[m].Reserve(Sz);
1931  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
1932  }
1933  if (j >= 0) {
1934  TInt Offset = DstNodeIds[j].GetVal2();
1935  TInt Sz = EdgeCol2.Len()-Offset;
1936  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1937  InVV[m].Reserve(Sz);
1938  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
1939  }
1940  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1941  }
1942  Graph->SetNodes(NumNodes);
1944 
1946  omp_set_num_threads(omp_get_max_threads());
1947  if (NodeType == atInt) {
1948  #pragma omp parallel for schedule(static)
1949  for (int i = 0; i < Partitions.Len(); i++) {
1950  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1951  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1952  while (RowI < EndI) {
1953  TInt RowId = RowI.GetRowIdx(); // EdgeId
1954  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1955  TInt DstId = RowI.GetIntAttr(DstColIdx);
1956  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1957  RowI++;
1958  }
1959  }
1960  }
1961  else if (NodeType == atStr) {
1962  #pragma omp parallel for schedule(static)
1963  for (int i = 0; i < Partitions.Len(); i++) {
1964  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1965  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1966  while (RowI < EndI) {
1967  TInt RowId = RowI.GetRowIdx(); // EdgeId
1968  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1969  TInt DstId = RowI.GetStrMapById(DstColIdx);
1970  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1971  RowI++;
1972  }
1973  }
1974 
1975  }
1976 
1977 
1978  Graph->SetEdges(NumRows);
1979  Graph->SetMxEId(NumRows);
1981 
1982 
1983  // make single pass over all rows in the table to add attributes
1984  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1985  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1986  continue;
1987  }
1988  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1989  TStr ColName = EdgeAttrV[ea_i];
1990  TAttrType T = Table->GetColType(ColName);
1991  TInt Index = Table->GetColIdx(ColName);
1992  switch (T) {
1993  case atInt:
1994  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1995  break;
1996  case atFlt:
1997  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1998  break;
1999  case atStr:
2000  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
2001  break;
2002  }
2003  }
2004  }
2005  // double endAdd = omp_get_wtime();
2006  // printf("Add time = %f\n", endAdd-endAlloc);
2007 
2008  return Graph;
2009 }
2010 
2011 
2013 template<class PGraph>
2014 PGraph ToNetwork(PTable Table,
2015  const TStr& SrcCol, const TStr& DstCol,
2016  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2017  TAttrAggr AggrPolicy) {
2018  PGraph Graph = PGraph::TObj::New();
2019 
2020  const TAttrType NodeType = Table->GetColType(SrcCol);
2021  Assert(NodeType == Table->GetColType(DstCol));
2022  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2023  const TInt DstColIdx = Table->GetColIdx(DstCol);
2024 
2025 
2026  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2027  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2028  THash<TInt, TStrIntVH> NodeIntAttrs;
2029  THash<TInt, TStrFltVH> NodeFltAttrs;
2030  THash<TInt, TStrStrVH> NodeStrAttrs;
2031 
2032 
2033  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
2034  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
2035  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
2036 
2037  // node values - i.e. the unique values of src/dst col
2038  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
2039  THash<TFlt, TInt> FltNodeVals;
2040 
2041  // make single pass over all rows in the table
2042  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2043  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2044  continue;
2045  }
2046 
2047  // add src and dst nodes to graph if they are not seen earlier
2048  TInt SVal, DVal;
2049  if (NodeType == atFlt) {
2050  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2051  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
2052  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2053  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
2054  }
2055  else if (NodeType == atInt || NodeType == atStr) {
2056  if (NodeType == atInt) {
2057  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
2058  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
2059  }
2060  else {
2061  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
2062  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
2063  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
2064  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
2065  }
2066  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
2067  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
2068  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
2069  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
2070  }
2071 
2072  // add edge and edge attributes
2073  Graph->AddEdge(SVal, DVal, CurrRowIdx);
2074 
2075  // Aggregate edge attributes and add to graph
2076  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
2077  TStr ColName = EdgeAttrV[i];
2078  TAttrType T = Table->GetColType(ColName);
2079  TInt Index = Table->GetColIdx(ColName);
2080  switch (T) {
2081  case atInt:
2082  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2083  break;
2084  case atFlt:
2085  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2086  break;
2087  case atStr:
2088  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
2089  break;
2090  }
2091  }
2092  }
2093 
2094 
2095  //Add node attribtes
2096  if (NodeAttrV.Len() > 0) {
2097  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2098  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2099  continue;
2100  }
2101  TInt NId;
2102  if (NodeTypeN == atInt) {
2103  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2104  }
2105  else if (NodeTypeN == atStr){
2106  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2107  }
2108  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2109  TStr ColName = NodeAttrV[i];
2110  TAttrType T = NodeTable->GetColType(ColName);
2111  TInt Index = NodeTable->GetColIdx(ColName);
2112  switch (T) {
2113  case atInt:
2114  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2115  break;
2116  case atFlt:
2117  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2118  break;
2119  case atStr:
2120  Graph->AddStrAttrDatN(NId, NodeTable->GetStrVal(Index, CurrRowIdx), ColName);
2121  break;
2122  }
2123  }
2124  }
2125  }
2126 
2127  return Graph;
2128 
2129 }
2130 
2131 
2133 template<class PGraphMP>
2134 inline PGraphMP ToNetworkMP(PTable Table,
2135  const TStr& SrcCol, const TStr& DstCol,
2136  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2137  TAttrAggr AggrPolicy) {
2139 
2141  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2142  const TInt DstColIdx = Table->GetColIdx(DstCol);
2143  const TInt NumRows = Table->GetNumValidRows();
2144 
2145  const TAttrType NodeType = Table->GetColType(SrcCol);
2146  Assert(NodeType == Table->GetColType(DstCol));
2147 
2148 
2149  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
2150 
2151  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2152  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2153  THash<TInt, TStrIntVH> NodeIntAttrs;
2154  THash<TInt, TStrFltVH> NodeFltAttrs;
2155  THash<TInt, TStrStrVH> NodeStrAttrs;
2156 
2157  #pragma omp parallel sections num_threads(4)
2158  {
2159  #pragma omp section
2160  { SrcCol1.Reserve(NumRows, NumRows); }
2161  #pragma omp section
2162  { EdgeCol1.Reserve(NumRows, NumRows); }
2163  #pragma omp section
2164  { DstCol2.Reserve(NumRows, NumRows); }
2165  #pragma omp section
2166  { EdgeCol2.Reserve(NumRows, NumRows); }
2167  }
2169 
2171  TIntPrV Partitions;
2172  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
2173  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2174 
2175  // double endPartition = omp_get_wtime();
2176  // printf("Partition time = %f\n", endPartition-endResize);
2177 
2178  omp_set_num_threads(omp_get_max_threads());
2179  if (NodeType == atInt) {
2180  #pragma omp parallel for schedule(static)
2181  for (int i = 0; i < Partitions.Len(); i++) {
2182  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2183  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2184  while (RowI < EndI) {
2185  TInt RowId = RowI.GetRowIdx();
2186  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
2187  EdgeCol1[RowId] = RowId;
2188  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
2189  EdgeCol2[RowId] = RowId;
2190  RowI++;
2191  }
2192  }
2193  }
2194  else if (NodeType == atStr) {
2195  #pragma omp parallel for schedule(static)
2196  for (int i = 0; i < Partitions.Len(); i++) {
2197  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2198  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2199  while (RowI < EndI) {
2200  TInt RowId = RowI.GetRowIdx();
2201  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
2202  EdgeCol1[RowId] = RowId;
2203  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
2204  EdgeCol2[RowId] = RowId;
2205  RowI++;
2206  }
2207  }
2208  }
2210 
2211  Sw->Start(TStopwatch::Sort);
2212  omp_set_num_threads(omp_get_max_threads());
2213  #pragma omp parallel
2214  {
2215  #pragma omp single nowait
2216  {
2217  #ifndef GLib_WIN32
2218  #pragma omp task untied shared(SrcCol1, EdgeCol1)
2219  #endif
2220  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
2221  }
2222  #pragma omp single nowait
2223  {
2224  #ifndef GLib_WIN32
2225  #pragma omp task untied shared(EdgeCol2, DstCol2)
2226  #endif
2227  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
2228  }
2229  #ifndef GLib_WIN32
2230  #pragma omp taskwait
2231  #endif
2232  }
2233  Sw->Stop(TStopwatch::Sort);
2234 
2235  Sw->Start(TStopwatch::Group);
2236  TInt NumThreads = omp_get_max_threads();
2237  TInt PartSize = (NumRows/NumThreads);
2238 
2239  // Find the offset of all partitions, each of which contains a list of rows.
2240  // Nodes from same sources or destinations are ensured to be kept within same partition.
2241  TIntV SrcOffsets, DstOffsets;
2242  SrcOffsets.Add(0);
2243  for (TInt i = 1; i < NumThreads; i++) {
2244  TInt CurrOffset = i * PartSize;
2245  while (CurrOffset < (i+1) * PartSize &&
2246  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
2247  // ensure that rows from the same sources are grouped together
2248  CurrOffset++;
2249  }
2250  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
2251  }
2252  SrcOffsets.Add(NumRows);
2253 
2254  DstOffsets.Add(0);
2255  for (TInt i = 1; i < NumThreads; i++) {
2256  TInt CurrOffset = i * PartSize;
2257  while (CurrOffset < (i+1) * PartSize &&
2258  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
2259  // ensure that rows to the same destinations are grouped together
2260  CurrOffset++;
2261  }
2262  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
2263  }
2264  DstOffsets.Add(NumRows);
2265 
2266  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
2267  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
2268 
2269  // count the number of source nodes and destination nodes in each partition
2270  TIntV SrcNodeCounts, DstNodeCounts;
2271  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
2272  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
2273 
2274  #pragma omp parallel for schedule(dynamic)
2275  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2276  if (t < SrcPartCnt) {
2277  TInt i = t;
2278  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2279  SrcNodeCounts[i] = 1;
2280  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2281  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2282  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2283  if (j < SrcOffsets[i+1]) {
2284  SrcNodeCounts[i]++;
2285  CurrNode = SrcCol1[j];
2286  }
2287  }
2288  }
2289  } else {
2290  TInt i = t - SrcPartCnt;
2291  if (DstOffsets[i] != DstOffsets[i+1]) {
2292  DstNodeCounts[i] = 1;
2293  TInt CurrNode = DstCol2[DstOffsets[i]];
2294  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2295  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2296  if (j < DstOffsets[i+1]) {
2297  DstNodeCounts[i]++;
2298  CurrNode = DstCol2[j];
2299  }
2300  }
2301  }
2302  }
2303  }
2304 
2305  TInt TotalSrcNodes = 0;
2306  TIntV SrcIdOffsets;
2307  for (int i = 0; i < SrcPartCnt; i++) {
2308  SrcIdOffsets.Add(TotalSrcNodes);
2309  TotalSrcNodes += SrcNodeCounts[i];
2310  }
2311 
2312  TInt TotalDstNodes = 0;
2313  TIntV DstIdOffsets;
2314  for (int i = 0; i < DstPartCnt; i++) {
2315  DstIdOffsets.Add(TotalDstNodes);
2316  TotalDstNodes += DstNodeCounts[i];
2317  }
2318 
2319  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
2320 
2321  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
2322  TIntPrV SrcNodeIds, DstNodeIds;
2323  #pragma omp parallel sections
2324  {
2325  #pragma omp section
2326  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
2327  #pragma omp section
2328  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
2329  }
2330 
2331  // Find the starting offset of each node (in both src and dst)
2332  #pragma omp parallel for schedule(dynamic)
2333  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2334  if (t < SrcPartCnt) {
2335  TInt i = t;
2336  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2337  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2338  TInt ThreadOffset = SrcIdOffsets[i];
2339  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
2340  TInt CurrCount = 1;
2341  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2342  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2343  if (j < SrcOffsets[i+1]) {
2344  CurrNode = SrcCol1[j];
2345  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2346  CurrCount++;
2347  }
2348  }
2349  }
2350  } else {
2351  TInt i = t - SrcPartCnt;
2352  if (DstOffsets[i] != DstOffsets[i+1]) {
2353  TInt CurrNode = DstCol2[DstOffsets[i]];
2354  TInt ThreadOffset = DstIdOffsets[i];
2355  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
2356  TInt CurrCount = 1;
2357  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2358  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2359  if (j < DstOffsets[i+1]) {
2360  CurrNode = DstCol2[j];
2361  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2362  CurrCount++;
2363  }
2364  }
2365  }
2366  }
2367  }
2368  Sw->Stop(TStopwatch::Group);
2369 
2371  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
2372  TIntTrV Nodes;
2373  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
2374 
2375  TInt i = 0, j = 0;
2376  while (i < TotalSrcNodes && j < TotalDstNodes) {
2377  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
2378  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
2379  i++;
2380  j++;
2381  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
2382  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
2383  i++;
2384  } else {
2385  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
2386  j++;
2387  }
2388  }
2389  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
2390  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
2392 
2394  TInt NumNodes = Nodes.Len();
2395  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
2396 // NumThreads = omp_get_max_threads();
2397 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
2398 
2399  TVec<TIntV> InVV(NumNodes);
2400  TVec<TIntV> OutVV(NumNodes);
2401 
2402 // omp_set_num_threads(NumThreads);
2403  #pragma omp parallel for schedule(static,100)
2404  for (int m = 0; m < NumNodes; m++) {
2405  //double startTr = omp_get_wtime();
2406  //TIntV OutV, InV;
2407  TInt n, i, j;
2408  Nodes[m].GetVal(n, i, j);
2409  if (i >= 0) {
2410  TInt Offset = SrcNodeIds[i].GetVal2();
2411  TInt Sz = EdgeCol1.Len()-Offset;
2412  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
2413  OutVV[m].Reserve(Sz);
2414  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
2415  }
2416  if (j >= 0) {
2417  TInt Offset = DstNodeIds[j].GetVal2();
2418  TInt Sz = EdgeCol2.Len()-Offset;
2419  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
2420  InVV[m].Reserve(Sz);
2421  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
2422  }
2423  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
2424  }
2425  Graph->SetNodes(NumNodes);
2427 
2429  omp_set_num_threads(omp_get_max_threads());
2430  if (NodeType == atInt) {
2431  #pragma omp parallel for schedule(static)
2432  for (int i = 0; i < Partitions.Len(); i++) {
2433  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2434  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2435  while (RowI < EndI) {
2436  TInt RowId = RowI.GetRowIdx(); // EdgeId
2437  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
2438  TInt DstId = RowI.GetIntAttr(DstColIdx);
2439  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2440  RowI++;
2441  }
2442  }
2443  }
2444  else if (NodeType == atStr) {
2445  #pragma omp parallel for schedule(static)
2446  for (int i = 0; i < Partitions.Len(); i++) {
2447  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2448  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2449  while (RowI < EndI) {
2450  TInt RowId = RowI.GetRowIdx(); // EdgeId
2451  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
2452  TInt DstId = RowI.GetStrMapById(DstColIdx);
2453  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2454  RowI++;
2455  }
2456  }
2457 
2458  }
2459 
2460 
2461  Graph->SetEdges(NumRows);
2462  Graph->SetMxEId(NumRows);
2464 
2465 
2466  // make single pass over all rows in the table to add attributes
2467  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2468  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2469  continue;
2470  }
2471  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
2472  TStr ColName = EdgeAttrV[ea_i];
2473  TAttrType T = Table->GetColType(ColName);
2474  TInt Index = Table->GetColIdx(ColName);
2475  switch (T) {
2476  case atInt:
2477  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2478  break;
2479  case atFlt:
2480  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2481  break;
2482  case atStr:
2483  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
2484  break;
2485  }
2486  }
2487  }
2488 
2489 
2490  //Add node attribtes
2491  if (NodeAttrV.Len() > 0) {
2492  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2493  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2494  continue;
2495  }
2496  TInt NId;
2497  if (NodeTypeN == atInt) {
2498  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2499  }
2500  else if (NodeTypeN == atStr){
2501  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2502  }
2503  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2504  TStr ColName = NodeAttrV[i];
2505  TAttrType T = NodeTable->GetColType(ColName);
2506  TInt Index = NodeTable->GetColIdx(ColName);
2507  switch (T) {
2508  case atInt:
2509  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2510  break;
2511  case atFlt:
2512  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2513  break;
2514  case atStr:
2515  Graph->AddStrAttrDatN(NId, NodeTable->GetStrVal(Index, CurrRowIdx), ColName);
2516  break;
2517  }
2518  }
2519  }
2520  }
2521  // double endAdd = omp_get_wtime();
2522  // printf("Add time = %f\n", endAdd-endAlloc);
2523 
2524  return Graph;
2525 }
2526 #endif // GCC_ATOMIC
2527 
2528 }; // TSnap namespace
2529 
2530 // TODO tidy up GCC_ATOMIC directives
2531 
2532 
2533 #endif // CONV_H
2534 
int GetPrimHashCd() const
Definition: dt.h:1078
TPair< TInt, TInt > TIntPr
Definition: ds.h:83
enum TAttrType_ TAttrType
Types for tables, sparse and dense attributes.
int LoadModeNetToNet(PMMNet Graph, const TStr &Name, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads a mode, with name Name, into the PMMNet from the TTable. NCol specifies the node id column and ...
Definition: conv.cpp:6
TInt GetIntAttr(TInt ColIdx) const
Returns value of integer attribute specified by integer column index for current row.
Definition: table.cpp:155
int Val
Definition: dt.h:1046
PGraphMP ToGraphMP(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel using the sort-first algorithm. This is the recommende...
Definition: conv.h:193
unsigned int uint
Definition: bd.h:11
TIter BegI() const
Definition: hash.h:171
TSizeTy Len() const
Returns the number of elements in the vector.
Definition: ds.h:547
static PNGraphMP New()
Static constructor that returns a pointer to the graph. Call: PNGraphMP Graph = TNGraphMP::New().
Definition: graphmp.h:138
void Start(const TExperiment Exp)
Start a new experiment.
Definition: util.cpp:733
TAttrAggr
Possible policies for aggregating node attributes.
Definition: table.h:266
const TDat & GetDat(const TKey &Key) const
Definition: hash.h:220
Node iterator. Only forward iteration (operator++) is supported.
Definition: network.h:1632
TIter EndI() const
Definition: hash.h:176
void Clr()
Definition: bd.h:502
void CopyUniqueFrom(TVec< TVal, TSizeTy > &Vec, TInt Offset, TInt Sz)
Copy Sz values from Vec starting at Offset.
Definition: ds.h:1029
Definition: gbase.h:23
Definition: dt.h:1293
void Stop(const TExperiment Exp)
Stop the current experiment.
Definition: util.cpp:737
Iterator class for TTable rows.
Definition: table.h:339
const TVal & GetVal(const TSizeTy &ValN) const
Returns a reference to the element at position ValN in the vector.
Definition: ds.h:621
int LoadCrossNetToNet(PMMNet Graph, const TStr &Mode1, const TStr &Mode2, const TStr &CrossName, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads a crossnet from Mode1 to Mode2, with name CrossName, from the provided TTable. EdgeAttrV specifies edge attributes.
Definition: conv.cpp:60
void Clr(const bool &DoDel=true, const TSizeTy &NoDelLim=-1)
Clears the contents of the vector.
Definition: ds.h:971
PGraph ToGraph(PTable Table, const TStr &SrcCol, const TStr &DstCol, TAttrAggr AggrPolicy)
Sequentially converts the table into a graph with links from nodes in SrcCol to those in DstCol...
Definition: conv.h:8
int LoadCrossNet(TCrossNet &Graph, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads the edges from the TTable and EdgeAttrV specifies columns containing edge attributes.
Definition: conv.cpp:69
PGraphMP ToNetworkMP(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Does Table to Network conversion in parallel using the sort-first algorithm. This is the recommended ...
Definition: conv.h:698
#define Assert(Cond)
Definition: bd.h:251
int LoadMode(TModeNet &Graph, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads the nodes specified in column NCol from the TTable with the attributes specified in NodeAttrV...
Definition: conv.cpp:14
TInt GetRowIdx() const
Gets the id of the row pointed by this iterator.
Definition: table.cpp:151
TInt GetStrMapById(TInt ColIdx) const
Returns integer mapping of a string attribute value specified by string column index for current row...
Definition: table.cpp:186
static void QSortKeyVal(TIntV &Key, TIntV &Val, TInt Start, TInt End)
Definition: table.cpp:5355
The nodes of one particular mode in a TMMNet, and their neighbor vectors as TIntV attributes...
Definition: mmnet.h:23
Definition: dt.h:1044
PGraphMP ToGraphMP3(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel. Uses the hash-first method, which is less optimal...
Definition: conv.h:534
Definition: dt.h:412
Definition: hash.h:88
Definition: gbase.h:23
Definition: bd.h:196
TTriple< TInt, TInt, TInt > TIntTr
Definition: ds.h:170
void Gen(const TSizeTy &_Vals)
Constructs a vector (an array) of _Vals elements.
Definition: ds.h:495
void Reserve(const TSizeTy &_MxVals)
Reserves enough memory for the vector to store _MxVals elements.
Definition: ds.h:515
static TStopwatch * GetInstance()
Definition: util.h:82
Definition: gbase.h:23
bool IsKey(const TKey &Key) const
Definition: hash.h:216
TSizeTy Add()
Adds a new element at the end of the vector, after its current last element.
Definition: ds.h:574
PGraph ToNetwork(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Converts table to a network. Suitable for PNEANet - Requires node and edge attribute column names as ...
Definition: conv.h:65
PGraphMP ToNetworkMP2(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Implements table to network conversion in parallel. Not the recommended algorithm, using ToNetworkMP instead.
Definition: conv.h:1120
Implements a single CrossNet consisting of edges between two TModeNets (could be the same TModeNet) ...
Definition: mmnet.h:124
Routines to benchmark table operations.
Definition: util.h:71