Skip to content

Commit dd52448

Browse files
committed
#420 Add support for FB format with BDW only.
1 parent b9ba931 commit dd52448

File tree

6 files changed

+55
-31
lines changed

6 files changed

+55
-31
lines changed

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,21 @@ the record defined by the copybook. But you can specify the record length explic
326326
```
327327

328328
Fixed block record formats (`FB`) are also supported. The support is _experimental_, if you find any issues, please
329-
let us know. When the record format is 'FB' you need to specify either block length or number of records per
329+
let us know. When the record format is 'FB' you can specify block length or number of records per
330330
block. As with 'F' if `record_length` is not specified, it will be determined from the copybook.
331+
332+
Records that have BDWs, but not rdws can be read like this:
333+
```
334+
.option("record_format", "FB")
335+
.option("record_length", "250")
336+
```
337+
or simply
338+
```
339+
.option("record_format", "FB")
340+
```
341+
342+
Records that have neither BDWs nor RDWs can be read like this:
343+
331344
```
332345
.option("record_format", "FB")
333346
.option("record_length", "250")

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ case class FixedBlockParameters(
2424

2525
object FixedBlockParameters {
2626
def validate(params: FixedBlockParameters): Unit = {
27-
if (params.blockLength.isEmpty && params.recordsPerBlock.isEmpty) {
28-
throw new IllegalArgumentException("FB record format requires block length or number records per block to be specified.")
29-
}
3027
if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) {
3128
throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.")
3229
}

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa
2222
private val recordQueue = new mutable.Queue[Array[Byte]]
2323

2424
private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
25-
private val bdwSize = fbParams.blockLength.getOrElse(fbParams.recordsPerBlock.get * recordSize)
25+
private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize))
2626

2727
override def offset: Long = ctx.inputStream.offset
2828

@@ -35,14 +35,20 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa
3535

3636
private def readNextBlock(): Unit = {
3737
if (!ctx.inputStream.isEndOfStream) {
38-
val bdwOffset = ctx.inputStream.offset
39-
val blockBuffer = ctx.inputStream.next(bdwSize)
38+
var bdwOffset = ctx.inputStream.offset
39+
40+
val nextBlockSize = bdwSize.getOrElse({
41+
val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize)
42+
val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
43+
bdwOffset += ctx.bdwDecoder.headerSize
44+
blockLength
45+
})
46+
47+
val blockBuffer = ctx.inputStream.next(nextBlockSize)
4048

4149
var blockIndex = 0
4250

4351
while (blockIndex < blockBuffer.length) {
44-
val rdwOffset = bdwOffset + blockIndex
45-
4652
val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize)
4753
if (payload.length > 0) {
4854
recordQueue.enqueue(payload)

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,16 @@ class FixedBlockRawRecordExtractorSuite extends WordSpec {
9999
assert(extractor.next().head == 0xF8.toByte)
100100
assert(!extractor.hasNext)
101101
}
102-
}
103102

104-
"failures" should {
105-
"throw an exception when neither block length nor records per block is specified" in {
103+
"allow neither block length nor records per block to be specified" in {
106104
val fb = FixedBlockParameters(Some(1), None, None)
107105

108-
val ex = intercept[IllegalArgumentException] {
109-
FixedBlockParameters.validate(fb)
110-
}
111-
112-
assert(ex.getMessage.contains("FB record format requires block length or number records per block to be specified."))
106+
assert(fb.blockLength.isEmpty)
107+
assert(fb.recordsPerBlock.isEmpty)
113108
}
109+
}
114110

111+
"failures" should {
115112
"throw an exception when both block length and records per block are specified" in {
116113
val fb = FixedBlockParameters(Some(1), Some(1), Some(1))
117114

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,6 @@ object CobolParametersParser {
325325
if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) {
326326
throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and '$PARAM_RECORDS_PER_BLOCK' cannot be used together.")
327327
}
328-
if (recordFormat == FixedBlock && bdw.blockLength.isEmpty && bdw.recordsPerBlock.isEmpty ) {
329-
throw new IllegalArgumentException(s"For FB file format either '$PARAM_BLOCK_LENGTH' or '$PARAM_RECORDS_PER_BLOCK' must be specified.")
330-
}
331328
if (recordFormat == VariableBlock && bdw.blockLength.nonEmpty) {
332329
logger.warn(s"Option '$PARAM_BLOCK_LENGTH' is ignored for record format: VB")
333330
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixtur
5454
"""[{"A":"0"},{"A":"0"},{"A":"1"},{"A":"1"}]""",
5555
ignoreCount = true)
5656
}
57+
58+
"load data with BDWs" in {
59+
testVbRecordLoad(2, 2,
60+
Map[String, String](),
61+
expected4Records,
62+
hasBDW = true)
63+
}
64+
65+
"load empty data with BDWs" in {
66+
testVbRecordLoad(0, 0,
67+
Map[String, String](),
68+
"[]",
69+
hasBDW = true)
70+
}
5771
}
5872

5973
"FB record failures should happen" when {
@@ -80,31 +94,31 @@ class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixtur
8094

8195
assert(ex.getMessage.contains("Options 'block_length' and 'records_per_block' cannot be used together."))
8296
}
83-
84-
"Mandatory options are missing" in {
85-
val ex = intercept[IllegalArgumentException] {
86-
testVbRecordLoad(1, 2, Map[String, String](), "")
87-
}
88-
89-
assert(ex.getMessage.contains("For FB file format either 'block_length' or 'records_per_block' must be specified."))
90-
}
9197
}
9298

9399
private def testVbRecordLoad(blocks: Int,
94100
records: Int,
95101
options: Map[String, String],
96102
expected: String,
97-
ignoreCount: Boolean = false): Unit = {
103+
ignoreCount: Boolean = false,
104+
hasBDW: Boolean = false): Unit = {
98105
val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => {
99-
Range(0, records).flatMap(recordNum => {
106+
val bdw: Seq[Byte] = if (hasBDW) {
107+
val byte0 = ((records * 2) % 256).toByte
108+
val byte1 = ((records * 2) / 256).toByte
109+
Seq(0, 0, byte0, byte1)
110+
} else {
111+
Nil
112+
}
113+
bdw ++ Range(0, records).flatMap(recordNum => {
100114
val idx = (blockNum * records + recordNum) % 10
101115
val v = (0xF0 + idx).toByte
102116
Seq(v, v)
103117
})
104118
})
105119

106120
withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 =>
107-
val df = spark
121+
val df = spark
108122
.read
109123
.format("cobol")
110124
.option("copybook_contents", copybook)

0 commit comments

Comments
 (0)